2 * Copyright 2006-2007, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
13 * OpenAFS demand attach fileserver
14 * Salvage server synchronization with fileserver.
17 /* This controls the size of an fd_set; it must be defined early before
18 * the system headers define that type and the macros that operate on it.
19 * Its value should be as large as the maximum file descriptor limit we
20 * are likely to run into on any platform. Right now, that is 65536
21 * which is the default hard fd limit on Solaris 9 */
23 #define FD_SETSIZE 65536
26 #include <afsconfig.h>
27 #include <afs/param.h>
32 #include <sys/types.h>
38 #include <sys/param.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
51 #include <afs/afsint.h>
53 #include <afs/errors.h>
57 #include <afs/afssyscalls.h>
61 #include "partition.h"
62 #include <rx/rx_queue.h>
64 #if !defined(offsetof)
68 #ifdef USE_UNIX_SOCKETS
69 #include <afs/afsutil.h>
74 /*@printflike@*/ extern void Log(const char *format, ...);
79 #define osi_Assert(e) (void)(e)
81 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
82 * move = dump+restore can run on single server */
84 #define MAX_BIND_TRIES 5 /* Number of times to retry socket bind */
88 /* Forward declarations */
89 static void * SALVSYNC_syncThread(void *);
90 static void SALVSYNC_newconnection(int fd);
91 static void SALVSYNC_com(int fd);
92 static void SALVSYNC_Drop(int fd);
93 static void AcceptOn(void);
94 static void AcceptOff(void);
95 static void InitHandler(void);
96 static void CallHandler(fd_set * fdsetp);
97 static int AddHandler(int afd, void (*aproc) (int));
98 static int FindHandler(register int afd);
99 static int FindHandler_r(register int afd);
100 static int RemoveHandler(register int afd);
101 static void GetHandler(fd_set * fdsetp, int *maxfdp);
105 * This lock controls access to the handler array.
107 struct Lock SALVSYNC_handler_lock;
110 #ifdef AFS_DEMAND_ATTACH_FS
112 * SALVSYNC is a feature specific to the demand attach fileserver
115 static int AllocNode(struct SalvageQueueNode ** node);
117 static int AddToSalvageQueue(struct SalvageQueueNode * node);
118 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
119 static void AddToPendingQueue(struct SalvageQueueNode * node);
120 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
121 static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
122 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
123 static void UpdateCommandPrio(struct SalvageQueueNode * node);
124 static void HandlePrio(struct SalvageQueueNode * clone,
125 struct SalvageQueueNode * parent,
126 afs_uint32 new_prio);
128 static int LinkNode(struct SalvageQueueNode * parent,
129 struct SalvageQueueNode * clone);
131 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
132 struct SalvageQueueNode ** parent);
133 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
134 struct SalvageQueueNode ** parent);
135 static void AddNodeToHash(struct SalvageQueueNode * node);
136 static void DeleteNodeFromHash(struct SalvageQueueNode * node);
138 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
139 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
140 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
141 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
142 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
147 extern pthread_mutex_t vol_salvsync_mutex;
149 static int AcceptSd = -1; /* Socket used by server for accepting connections */
153 * queue of all volumes waiting to be salvaged.
155 struct SalvageQueue {
156 volatile int total_len;
157 volatile afs_int32 last_insert; /**< id of last partition to have a salvage node inserted */
158 volatile int len[VOLMAXPARTS+1];
159 volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
162 static struct SalvageQueue salvageQueue; /* volumes waiting to be salvaged */
165 * queue of all volumes currently being salvaged.
168 volatile struct rx_queue q; /**< queue of salvages in progress */
169 volatile int len; /**< length of in-progress queue */
170 pthread_cond_t queue_change_cv;
172 static struct QueueHead pendingQueue; /* volumes being salvaged */
175 * whether a partition has a salvage in progress
177 * the salvager code only permits one salvage per partition at a time
179 * the following hack tries to keep salvaged parallelism high by
180 * only permitting one salvage dispatch per partition at a time
182 * unfortunately, the parallel salvager currently
183 * has a rather braindead routine that won't permit
184 * multiple salvages on the same "device". this
185 * function happens to break pretty badly on lvm, raid luns, etc.
187 * this hack isn't good enough to stop the device limiting code from
188 * crippling performance. someday that code needs to be rewritten
190 static int partition_salvaging[VOLMAXPARTS+1];
192 #define VSHASH_SIZE 64
193 #define VSHASH_MASK (VSHASH_SIZE-1)
194 #define VSHASH(vid) ((vid)&VSHASH_MASK)
196 static struct QueueHead SalvageHashTable[VSHASH_SIZE];
198 static struct SalvageQueueNode *
199 LookupNode(afs_uint32 vid, char * partName,
200 struct SalvageQueueNode ** parent)
202 struct rx_queue *qp, *nqp;
203 struct SalvageQueueNode *vsp;
204 int idx = VSHASH(vid);
206 for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
207 vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
208 if ((vsp->command.sop.volume == vid) &&
209 !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
214 if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
220 *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
221 vsp->volgroup.parent : vsp;
230 static struct SalvageQueueNode *
231 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
232 struct SalvageQueueNode ** parent)
234 return LookupNode(qry->volume, qry->partName, parent);
238 AddNodeToHash(struct SalvageQueueNode * node)
240 int idx = VSHASH(node->command.sop.volume);
242 if (queue_IsOnQueue(&node->hash_chain)) {
246 queue_Append(&SalvageHashTable[idx], &node->hash_chain);
247 SalvageHashTable[idx].len++;
251 DeleteNodeFromHash(struct SalvageQueueNode * node)
253 int idx = VSHASH(node->command.sop.volume);
255 if (queue_IsNotOnQueue(&node->hash_chain)) {
259 queue_Remove(&node->hash_chain);
260 SalvageHashTable[idx].len--;
264 SALVSYNC_salvInit(void)
268 pthread_attr_t tattr;
270 /* initialize the queues */
271 assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
272 for (i = 0; i <= VOLMAXPARTS; i++) {
273 queue_Init(&salvageQueue.part[i]);
274 salvageQueue.len[i] = 0;
276 assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
277 queue_Init(&pendingQueue);
278 salvageQueue.total_len = pendingQueue.len = 0;
279 salvageQueue.last_insert = -1;
280 memset(partition_salvaging, 0, sizeof(partition_salvaging));
282 for (i = 0; i < VSHASH_SIZE; i++) {
283 assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
284 SalvageHashTable[i].len = 0;
285 queue_Init(&SalvageHashTable[i]);
288 /* start the salvsync thread */
289 assert(pthread_attr_init(&tattr) == 0);
290 assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
291 assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
294 #ifdef USE_UNIX_SOCKETS
296 getport(struct sockaddr_un *addr)
299 char tbuffer[AFSDIR_PATH_MAX];
301 strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
302 "fssync.sock", NULL);
304 memset(addr, 0, sizeof(*addr));
305 addr->sun_family = AF_UNIX;
306 strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
307 assert((sd = socket(AF_UNIX, SOCK_STREAM, 0)) >= 0);
312 getport(struct sockaddr_in *addr)
316 memset(addr, 0, sizeof(*addr));
317 assert((sd = socket(AF_INET, SOCK_STREAM, 0)) >= 0);
318 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
319 addr->sin_len = sizeof(struct sockaddr_in);
321 addr->sin_addr.s_addr = htonl(0x7f000001);
322 addr->sin_family = AF_INET; /* was localhost->h_addrtype */
323 addr->sin_port = htons(2041); /* XXXX htons not _really_ neccessary */
329 static fd_set SALVSYNC_readfds;
332 SALVSYNC_syncThread(void * args)
334 struct sockaddr_in addr;
339 #ifdef USE_UNIX_SOCKETS
340 char tbuffer[AFSDIR_PATH_MAX];
344 (void)signal(SIGPIPE, SIG_IGN);
347 /* set our 'thread-id' so that the host hold table works */
348 MUTEX_ENTER(&rx_stats_mutex); /* protects rxi_pthread_hinum */
349 tid = ++rxi_pthread_hinum;
350 MUTEX_EXIT(&rx_stats_mutex);
351 pthread_setspecific(rx_thread_id_key, (void *)tid);
352 Log("Set thread id %d for SALVSYNC_syncThread\n", tid);
354 #ifdef USE_UNIX_SOCKETS
355 strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
356 "fssync.sock", NULL);
359 #endif /* USE_UNIX_SOCKETS */
361 AcceptSd = getport(&addr);
362 /* Reuseaddr needed because system inexplicably leaves crud lying around */
364 setsockopt(AcceptSd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
367 Log("SALVSYNC_sync: setsockopt failed with (%d)\n", errno);
369 for (numTries = 0; numTries < MAX_BIND_TRIES; numTries++) {
371 bind(AcceptSd, (struct sockaddr *)&addr, sizeof(addr))) == 0)
373 Log("SALVSYNC_sync: bind failed with (%d), will sleep and retry\n",
378 listen(AcceptSd, 100);
384 GetHandler(&SALVSYNC_readfds, &maxfd);
385 /* Note: check for >= 1 below is essential since IOMGR_select
386 * doesn't have exactly same semantics as select.
388 if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
389 CallHandler(&SALVSYNC_readfds);
396 SALVSYNC_newconnection(int afd)
398 #ifdef USE_UNIX_SOCKETS
399 struct sockaddr_un other;
400 #else /* USE_UNIX_SOCKETS */
401 struct sockaddr_in other;
404 junk = sizeof(other);
405 fd = accept(afd, (struct sockaddr *)&other, &junk);
407 Log("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
409 } else if (!AddHandler(fd, SALVSYNC_com)) {
411 assert(AddHandler(fd, SALVSYNC_com));
415 /* this function processes commands from an salvsync file descriptor (fd) */
416 static afs_int32 SALV_cnt = 0;
422 SALVSYNC_response_hdr sres_hdr;
423 SALVSYNC_command scom;
424 SALVSYNC_response sres;
425 SYNC_PROTO_BUF_DECL(buf);
427 com.payload.buf = (void *)buf;
428 com.payload.len = SYNC_PROTO_MAX_LEN;
429 res.payload.buf = (void *) &sres_hdr;
430 res.payload.len = sizeof(sres_hdr);
431 res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
432 res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
435 scom.sop = (SALVSYNC_command_hdr *) buf;
438 sres.sop = &sres_hdr;
442 if (SYNC_getCom(fd, &com)) {
443 Log("SALVSYNC_com: read failed; dropping connection (cnt=%d)\n", SALV_cnt);
448 if (com.recv_len < sizeof(com.hdr)) {
449 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
450 res.hdr.response = SYNC_COM_ERROR;
451 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
452 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
456 if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
457 Log("SALVSYNC_com: invalid protocol version (%u)\n", com.hdr.proto_version);
458 res.hdr.response = SYNC_COM_ERROR;
459 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
463 if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
464 res.hdr.response = SYNC_OK;
465 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
469 if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
470 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
471 res.hdr.response = SYNC_COM_ERROR;
472 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
473 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
478 switch (com.hdr.command) {
481 case SALVSYNC_SALVAGE:
482 case SALVSYNC_RAISEPRIO:
483 res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
485 case SALVSYNC_CANCEL:
486 /* cancel a salvage */
487 res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
489 case SALVSYNC_CANCELALL:
490 /* cancel all queued salvages */
491 res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
494 /* query whether a volume is done salvaging */
495 res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
497 case SALVSYNC_OP_LINK:
498 /* link a clone to its parent in the scheduler */
499 res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
502 res.hdr.response = SYNC_BAD_COMMAND;
506 sres_hdr.sq_len = salvageQueue.total_len;
507 sres_hdr.pq_len = pendingQueue.len;
511 SYNC_putRes(fd, &res);
512 if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
518 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
520 afs_int32 code = SYNC_OK;
521 struct SalvageQueueNode * node, * clone;
524 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
526 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
530 clone = LookupNodeByCommand(com->sop, &node);
533 if (AllocNode(&node)) {
535 res->hdr->reason = SYNC_REASON_NOMEM;
542 HandlePrio(clone, node, com->sop->prio);
544 switch (node->state) {
545 case SALVSYNC_STATE_QUEUED:
546 UpdateCommandPrio(node);
549 case SALVSYNC_STATE_ERROR:
550 case SALVSYNC_STATE_DONE:
551 case SALVSYNC_STATE_UNKNOWN:
552 memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
553 memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
554 if (AddToSalvageQueue(node)) {
567 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
568 res->sop->state = node->state;
569 res->sop->prio = node->command.sop.prio;
576 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
578 afs_int32 code = SYNC_OK;
579 struct SalvageQueueNode * node;
581 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
583 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
587 node = LookupNodeByCommand(com->sop, NULL);
590 res->sop->state = SALVSYNC_STATE_UNKNOWN;
593 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
594 res->sop->prio = node->command.sop.prio;
595 res->sop->state = node->state;
596 if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
597 (node->state == SALVSYNC_STATE_QUEUED)) {
598 DeleteFromSalvageQueue(node);
607 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
609 struct SalvageQueueNode * np, *nnp;
610 struct DiskPartition * dp;
612 for (dp = DiskPartitionList ; dp ; dp = dp->next) {
613 for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
614 DeleteFromSalvageQueue(np);
622 * link a queue node for a clone to its parent volume.
625 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
627 afs_int32 code = SYNC_OK;
628 struct SalvageQueueNode * clone, * parent;
630 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
632 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
636 /* lookup clone's salvage scheduling node */
637 clone = LookupNodeByCommand(com->sop, NULL);
640 res->hdr->reason = SALVSYNC_REASON_ERROR;
644 /* lookup parent's salvage scheduling node */
645 parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
646 if (parent == NULL) {
647 if (AllocNode(&parent)) {
649 res->hdr->reason = SYNC_REASON_NOMEM;
652 memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
653 memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
654 parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
655 AddNodeToHash(parent);
658 if (LinkNode(parent, clone)) {
668 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
670 afs_int32 code = SYNC_OK;
671 struct SalvageQueueNode * node;
673 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
675 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
679 LookupNodeByCommand(com->sop, &node);
681 /* query whether a volume is done salvaging */
683 res->sop->state = SALVSYNC_STATE_UNKNOWN;
686 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
687 res->sop->state = node->state;
688 res->sop->prio = node->command.sop.prio;
696 SALVSYNC_Drop(int fd)
707 static int AcceptHandler = -1; /* handler id for accept, if turned on */
712 if (AcceptHandler == -1) {
713 assert(AddHandler(AcceptSd, SALVSYNC_newconnection));
714 AcceptHandler = FindHandler(AcceptSd);
721 if (AcceptHandler != -1) {
722 assert(RemoveHandler(AcceptSd));
727 /* The multiple FD handling code. */
729 static int HandlerFD[MAXHANDLERS];
730 static void (*HandlerProc[MAXHANDLERS]) (int);
736 ObtainWriteLock(&SALVSYNC_handler_lock);
737 for (i = 0; i < MAXHANDLERS; i++) {
739 HandlerProc[i] = NULL;
741 ReleaseWriteLock(&SALVSYNC_handler_lock);
745 CallHandler(fd_set * fdsetp)
748 ObtainReadLock(&SALVSYNC_handler_lock);
749 for (i = 0; i < MAXHANDLERS; i++) {
750 if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
751 ReleaseReadLock(&SALVSYNC_handler_lock);
752 (*HandlerProc[i]) (HandlerFD[i]);
753 ObtainReadLock(&SALVSYNC_handler_lock);
756 ReleaseReadLock(&SALVSYNC_handler_lock);
760 AddHandler(int afd, void (*aproc) (int))
763 ObtainWriteLock(&SALVSYNC_handler_lock);
764 for (i = 0; i < MAXHANDLERS; i++)
765 if (HandlerFD[i] == -1)
767 if (i >= MAXHANDLERS) {
768 ReleaseWriteLock(&SALVSYNC_handler_lock);
772 HandlerProc[i] = aproc;
773 ReleaseWriteLock(&SALVSYNC_handler_lock);
778 FindHandler(register int afd)
781 ObtainReadLock(&SALVSYNC_handler_lock);
782 for (i = 0; i < MAXHANDLERS; i++)
783 if (HandlerFD[i] == afd) {
784 ReleaseReadLock(&SALVSYNC_handler_lock);
787 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
789 return -1; /* satisfy compiler */
793 FindHandler_r(register int afd)
796 for (i = 0; i < MAXHANDLERS; i++)
797 if (HandlerFD[i] == afd) {
801 return -1; /* satisfy compiler */
805 RemoveHandler(register int afd)
807 ObtainWriteLock(&SALVSYNC_handler_lock);
808 HandlerFD[FindHandler_r(afd)] = -1;
809 ReleaseWriteLock(&SALVSYNC_handler_lock);
814 GetHandler(fd_set * fdsetp, int *maxfdp)
817 register int maxfd = -1;
819 ObtainReadLock(&SALVSYNC_handler_lock); /* just in case */
820 for (i = 0; i < MAXHANDLERS; i++)
821 if (HandlerFD[i] != -1) {
822 FD_SET(HandlerFD[i], fdsetp);
823 if (maxfd < HandlerFD[i])
824 maxfd = HandlerFD[i];
827 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
831 AllocNode(struct SalvageQueueNode ** node_out)
834 struct SalvageQueueNode * node;
836 *node_out = node = (struct SalvageQueueNode *)
837 malloc(sizeof(struct SalvageQueueNode));
843 memset(node, 0, sizeof(struct SalvageQueueNode));
844 node->type = SALVSYNC_VOLGROUP_PARENT;
845 node->state = SALVSYNC_STATE_UNKNOWN;
852 LinkNode(struct SalvageQueueNode * parent,
853 struct SalvageQueueNode * clone)
858 /* check for attaching a clone to a clone */
859 if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
864 /* check for pre-existing registration and openings */
865 for (idx = 0; idx < VOLMAXTYPES; idx++) {
866 if (parent->volgroup.children[idx] == clone) {
869 if (parent->volgroup.children[idx] == NULL) {
873 if (idx == VOLMAXTYPES) {
878 /* link parent and child */
879 parent->volgroup.children[idx] = clone;
880 clone->type = SALVSYNC_VOLGROUP_CLONE;
881 clone->volgroup.parent = parent;
885 switch (clone->state) {
886 case SALVSYNC_STATE_QUEUED:
887 DeleteFromSalvageQueue(clone);
889 case SALVSYNC_STATE_SALVAGING:
890 switch (parent->state) {
891 case SALVSYNC_STATE_UNKNOWN:
892 case SALVSYNC_STATE_ERROR:
893 case SALVSYNC_STATE_DONE:
894 parent->command.sop.prio = clone->command.sop.prio;
895 AddToSalvageQueue(parent);
898 case SALVSYNC_STATE_QUEUED:
899 if (clone->command.sop.prio) {
900 parent->command.sop.prio += clone->command.sop.prio;
901 UpdateCommandPrio(parent);
919 HandlePrio(struct SalvageQueueNode * clone,
920 struct SalvageQueueNode * node,
925 switch (node->state) {
926 case SALVSYNC_STATE_ERROR:
927 case SALVSYNC_STATE_DONE:
928 case SALVSYNC_STATE_UNKNOWN:
929 node->command.sop.prio = 0;
933 if (new_prio < clone->command.sop.prio) {
934 /* strange. let's just set our delta to 1 */
937 delta = new_prio - clone->command.sop.prio;
940 if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
941 clone->command.sop.prio = new_prio;
944 node->command.sop.prio += delta;
948 AddToSalvageQueue(struct SalvageQueueNode * node)
951 struct SalvageQueueNode * last = NULL;
953 id = volutil_GetPartitionID(node->command.sop.partName);
954 if (id < 0 || id > VOLMAXPARTS) {
957 if (!VGetPartitionById_r(id, 0)) {
958 /* don't enqueue salvage requests for unmounted partitions */
961 if (queue_IsOnQueue(node)) {
965 if (queue_IsNotEmpty(&salvageQueue.part[id])) {
966 last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
968 queue_Append(&salvageQueue.part[id], node);
969 salvageQueue.len[id]++;
970 salvageQueue.total_len++;
971 salvageQueue.last_insert = id;
972 node->partition_id = id;
973 node->state = SALVSYNC_STATE_QUEUED;
975 /* reorder, if necessary */
976 if (last && last->command.sop.prio < node->command.sop.prio) {
977 UpdateCommandPrio(node);
980 assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
985 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
987 if (queue_IsOnQueue(node)) {
989 salvageQueue.len[node->partition_id]--;
990 salvageQueue.total_len--;
991 node->state = SALVSYNC_STATE_UNKNOWN;
992 assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
997 AddToPendingQueue(struct SalvageQueueNode * node)
999 queue_Append(&pendingQueue, node);
1001 node->state = SALVSYNC_STATE_SALVAGING;
1002 assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1006 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1008 if (queue_IsOnQueue(node)) {
1011 node->state = SALVSYNC_STATE_UNKNOWN;
1012 assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1016 static struct SalvageQueueNode *
1017 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1019 struct SalvageQueueNode * np, * nnp;
1021 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1022 if ((np->command.sop.volume == qry->volume) &&
1023 !strncmp(np->command.sop.partName, qry->partName,
1024 sizeof(qry->partName)))
1028 if (queue_IsEnd(&pendingQueue, np))
1033 static struct SalvageQueueNode *
1034 LookupPendingCommandByPid(int pid)
1036 struct SalvageQueueNode * np, * nnp;
1038 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1043 if (queue_IsEnd(&pendingQueue, np))
1049 /* raise the priority of a previously scheduled salvage */
1051 UpdateCommandPrio(struct SalvageQueueNode * node)
1053 struct SalvageQueueNode *np, *nnp;
1057 assert(queue_IsOnQueue(node));
1059 prio = node->command.sop.prio;
1060 id = node->partition_id;
1061 if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1063 queue_Prepend(&salvageQueue.part[id], node);
1065 for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1066 if (np->command.sop.prio > prio)
1069 if (queue_IsEnd(&salvageQueue.part[id], np)) {
1071 queue_Prepend(&salvageQueue.part[id], node);
1072 } else if (node != np) {
1074 queue_InsertAfter(np, node);
1079 /* this will need to be rearchitected if we ever want more than one thread
1080 * to wait for new salvage nodes */
1081 struct SalvageQueueNode *
1082 SALVSYNC_getWork(void)
1085 struct DiskPartition * dp = NULL, * fdp;
1086 static afs_int32 next_part_sched = 0;
1087 struct SalvageQueueNode *node = NULL, *np;
1092 * wait for work to be scheduled
1093 * if there are no disk partitions, just sit in this wait loop forever
1095 while (!salvageQueue.total_len || !DiskPartitionList) {
1096 assert(pthread_cond_wait(&salvageQueue.cv, &vol_glock_mutex) == 0);
1100 * short circuit for simple case where only one partition has
1101 * scheduled salvages
1103 if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1104 (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1105 node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1111 * ok, more than one partition has scheduled salvages.
1112 * now search for partitions with scheduled salvages, but no pending salvages.
1114 dp = VGetPartitionById_r(next_part_sched, 0);
1116 dp = DiskPartitionList;
1122 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1123 if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1124 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1131 * all partitions with scheduled salvages have at least one pending.
1132 * now do an exhaustive search for a scheduled salvage.
1138 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1139 if (salvageQueue.len[dp->index]) {
1140 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1145 /* we should never reach this line */
1149 assert(node != NULL);
1151 partition_salvaging[node->partition_id]++;
1152 DeleteFromSalvageQueue(node);
1153 AddToPendingQueue(node);
1156 /* update next_part_sched field */
1158 next_part_sched = dp->next->index;
1159 } else if (DiskPartitionList) {
1160 next_part_sched = DiskPartitionList->index;
1162 next_part_sched = -1;
1172 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1177 DeleteFromPendingQueue(node);
1178 partid = node->partition_id;
1179 if (partid >=0 && partid <= VOLMAXPARTS) {
1180 partition_salvaging[partid]--;
1183 node->state = SALVSYNC_STATE_DONE;
1184 } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1185 node->state = SALVSYNC_STATE_ERROR;
1188 if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1189 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1190 if (node->volgroup.children[idx]) {
1191 node->volgroup.children[idx]->state = node->state;
1198 SALVSYNC_doneWork(struct SalvageQueueNode * node, int result)
1201 SALVSYNC_doneWork_r(node, result);
1206 SALVSYNC_doneWorkByPid(int pid, int result)
1208 struct SalvageQueueNode * node;
1211 node = LookupPendingCommandByPid(pid);
1213 SALVSYNC_doneWork_r(node, result);
1218 #endif /* AFS_DEMAND_ATTACH_FS */