2 * Copyright 2006, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
13 * OpenAFS demand attach fileserver
14 * Salvage server synchronization with fileserver.
17 /* This controls the size of an fd_set; it must be defined early before
18 * the system headers define that type and the macros that operate on it.
19 * Its value should be as large as the maximum file descriptor limit we
20 * are likely to run into on any platform. Right now, that is 65536
21 * which is the default hard fd limit on Solaris 9 */
23 #define FD_SETSIZE 65536
26 #include <afsconfig.h>
27 #include <afs/param.h>
32 #include <sys/types.h>
38 #include <sys/param.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
58 #include <afs/afsint.h>
60 #include <afs/errors.h>
64 #include <afs/afssyscalls.h>
68 #include "partition.h"
69 #include <rx/rx_queue.h>
71 #if !defined(offsetof)
75 #ifdef USE_UNIX_SOCKETS
76 #include <afs/afsutil.h>
81 /*@printflike@*/ extern void Log(const char *format, ...);
86 #define osi_Assert(e) (void)(e)
88 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
89 * move = dump+restore can run on single server */
91 #define MAX_BIND_TRIES 5 /* Number of times to retry socket bind */
95 /* Forward declarations */
96 static void * SALVSYNC_syncThread(void *);
97 static void SALVSYNC_newconnection(int fd);
98 static void SALVSYNC_com(int fd);
99 static void SALVSYNC_Drop(int fd);
100 static void AcceptOn(void);
101 static void AcceptOff(void);
102 static void InitHandler(void);
103 static void CallHandler(fd_set * fdsetp);
104 static int AddHandler(int afd, void (*aproc) (int));
105 static int FindHandler(register int afd);
106 static int FindHandler_r(register int afd);
107 static int RemoveHandler(register int afd);
108 static void GetHandler(fd_set * fdsetp, int *maxfdp);
112 * This lock controls access to the handler array.
114 struct Lock SALVSYNC_handler_lock;
117 #ifdef AFS_DEMAND_ATTACH_FS
119 * SALVSYNC is a feature specific to the demand attach fileserver
122 static int AddToSalvageQueue(struct SalvageQueueNode * node);
123 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
124 static void AddToPendingQueue(struct SalvageQueueNode * node);
125 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
126 static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
127 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
128 static void RaiseCommandPrio(struct SalvageQueueNode * node, SALVSYNC_command_hdr * com);
130 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName);
131 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry);
132 static void AddNodeToHash(struct SalvageQueueNode * node);
133 static void DeleteNodeFromHash(struct SalvageQueueNode * node);
135 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
136 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
137 static afs_int32 SALVSYNC_com_RaisePrio(SALVSYNC_command * com, SALVSYNC_response * res);
138 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
139 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
144 extern pthread_mutex_t vol_salvsync_mutex;
146 static int AcceptSd = -1; /* Socket used by server for accepting connections */
149 /* be careful about rearranging elements in this structure.
150 * element placement has been optimized for locality of reference
151 * in SALVSYNC_getWork() */
152 struct SalvageQueue {
153 volatile int total_len;
154 volatile afs_int32 last_insert; /* id of last partition to have a salvage node insert */
155 volatile int len[VOLMAXPARTS+1];
156 volatile struct rx_queue part[VOLMAXPARTS+1];
159 static struct SalvageQueue salvageQueue; /* volumes waiting to be salvaged */
162 volatile struct rx_queue q;
164 pthread_cond_t queue_change_cv;
166 static struct QueueHead pendingQueue; /* volumes being salvaged */
169 * whether a partition has a salvage in progress
171 * the salvager code only permits one salvage per partition at a time
173 * the following hack tries to keep salvaged parallelism high by
174 * only permitting one salvage dispatch per partition at a time
176 * unfortunately, the parallel salvager currently
177 * has a rather braindead routine that won't permit
178 * multiple salvages on the same "device". this
179 * function happens to break pretty badly on lvm, raid luns, etc.
181 * this hack isn't good enough to stop the device limiting code from
182 * crippling performance. someday that code needs to be rewritten
184 static int partition_salvaging[VOLMAXPARTS+1];
186 #define VSHASH_SIZE 64
187 #define VSHASH_MASK (VSHASH_SIZE-1)
188 #define VSHASH(vid) ((vid)&VSHASH_MASK)
190 static struct QueueHead SalvageHashTable[VSHASH_SIZE];
192 static struct SalvageQueueNode *
193 LookupNode(afs_uint32 vid, char * partName)
195 struct rx_queue *qp, *nqp;
196 struct SalvageQueueNode *vsp;
197 int idx = VSHASH(vid);
199 for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
200 vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
201 if ((vsp->command.sop.volume == vid) &&
202 !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
207 if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
213 static struct SalvageQueueNode *
214 LookupNodeByCommand(SALVSYNC_command_hdr * qry)
216 return LookupNode(qry->volume, qry->partName);
220 AddNodeToHash(struct SalvageQueueNode * node)
222 int idx = VSHASH(node->command.sop.volume);
224 if (queue_IsOnQueue(&node->hash_chain)) {
228 queue_Append(&SalvageHashTable[idx], &node->hash_chain);
229 SalvageHashTable[idx].len++;
233 DeleteNodeFromHash(struct SalvageQueueNode * node)
235 int idx = VSHASH(node->command.sop.volume);
237 if (queue_IsNotOnQueue(&node->hash_chain)) {
241 queue_Remove(&node->hash_chain);
242 SalvageHashTable[idx].len--;
246 SALVSYNC_salvInit(void)
250 pthread_attr_t tattr;
252 /* initialize the queues */
253 assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
254 for (i = 0; i <= VOLMAXPARTS; i++) {
255 queue_Init(&salvageQueue.part[i]);
256 salvageQueue.len[i] = 0;
258 assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
259 queue_Init(&pendingQueue);
260 salvageQueue.total_len = pendingQueue.len = 0;
261 salvageQueue.last_insert = -1;
262 memset(partition_salvaging, 0, sizeof(partition_salvaging));
264 for (i = 0; i < VSHASH_SIZE; i++) {
265 assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
266 SalvageHashTable[i].len = 0;
267 queue_Init(&SalvageHashTable[i]);
270 /* start the salvsync thread */
271 assert(pthread_attr_init(&tattr) == 0);
272 assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
273 assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
276 #ifdef USE_UNIX_SOCKETS
278 getport(struct sockaddr_un *addr)
281 char tbuffer[AFSDIR_PATH_MAX];
283 strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
284 "fssync.sock", NULL);
286 memset(addr, 0, sizeof(*addr));
287 addr->sun_family = AF_UNIX;
288 strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
289 assert((sd = socket(AF_UNIX, SOCK_STREAM, 0)) >= 0);
294 getport(struct sockaddr_in *addr)
298 memset(addr, 0, sizeof(*addr));
299 assert((sd = socket(AF_INET, SOCK_STREAM, 0)) >= 0);
300 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
301 addr->sin_len = sizeof(struct sockaddr_in);
303 addr->sin_addr.s_addr = htonl(0x7f000001);
304 addr->sin_family = AF_INET; /* was localhost->h_addrtype */
305 addr->sin_port = htons(2041); /* XXXX htons not _really_ neccessary */
311 static fd_set SALVSYNC_readfds;
314 SALVSYNC_syncThread(void * args)
316 struct sockaddr_in addr;
321 #ifdef USE_UNIX_SOCKETS
322 char tbuffer[AFSDIR_PATH_MAX];
326 (void)signal(SIGPIPE, SIG_IGN);
329 /* set our 'thread-id' so that the host hold table works */
330 MUTEX_ENTER(&rx_stats_mutex); /* protects rxi_pthread_hinum */
331 tid = ++rxi_pthread_hinum;
332 MUTEX_EXIT(&rx_stats_mutex);
333 pthread_setspecific(rx_thread_id_key, (void *)tid);
334 Log("Set thread id %d for SALVSYNC_syncThread\n", tid);
336 #ifdef USE_UNIX_SOCKETS
337 strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
338 "fssync.sock", NULL);
341 #endif /* USE_UNIX_SOCKETS */
343 AcceptSd = getport(&addr);
344 /* Reuseaddr needed because system inexplicably leaves crud lying around */
346 setsockopt(AcceptSd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
349 Log("SALVSYNC_sync: setsockopt failed with (%d)\n", errno);
351 for (numTries = 0; numTries < MAX_BIND_TRIES; numTries++) {
353 bind(AcceptSd, (struct sockaddr *)&addr, sizeof(addr))) == 0)
355 Log("SALVSYNC_sync: bind failed with (%d), will sleep and retry\n",
360 listen(AcceptSd, 100);
366 GetHandler(&SALVSYNC_readfds, &maxfd);
367 /* Note: check for >= 1 below is essential since IOMGR_select
368 * doesn't have exactly same semantics as select.
370 if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
371 CallHandler(&SALVSYNC_readfds);
378 SALVSYNC_newconnection(int afd)
380 #ifdef USE_UNIX_SOCKETS
381 struct sockaddr_un other;
382 #else /* USE_UNIX_SOCKETS */
383 struct sockaddr_in other;
386 junk = sizeof(other);
387 fd = accept(afd, (struct sockaddr *)&other, &junk);
389 Log("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
391 } else if (!AddHandler(fd, SALVSYNC_com)) {
393 assert(AddHandler(fd, SALVSYNC_com));
397 /* this function processes commands from an salvsync file descriptor (fd) */
398 static afs_int32 SALV_cnt = 0;
404 SALVSYNC_response_hdr sres_hdr;
405 SALVSYNC_command scom;
406 SALVSYNC_response sres;
407 SYNC_PROTO_BUF_DECL(buf);
409 com.payload.buf = (void *)buf;
410 com.payload.len = SYNC_PROTO_MAX_LEN;
411 res.payload.buf = (void *) &sres_hdr;
412 res.payload.len = sizeof(sres_hdr);
413 res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
414 res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
417 scom.sop = (SALVSYNC_command_hdr *) buf;
420 sres.sop = &sres_hdr;
424 if (SYNC_getCom(fd, &com)) {
425 Log("SALVSYNC_com: read failed; dropping connection (cnt=%d)\n", SALV_cnt);
430 if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
431 Log("SALVSYNC_com: invalid protocol version (%u)\n", com.hdr.proto_version);
432 res.hdr.response = SYNC_COM_ERROR;
433 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
437 if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
438 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
439 res.hdr.response = SYNC_COM_ERROR;
440 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
441 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
446 switch (com.hdr.command) {
449 case SALVSYNC_SALVAGE:
450 res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
452 case SALVSYNC_CANCEL:
453 /* cancel a salvage */
454 res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
456 case SALVSYNC_CANCELALL:
457 /* cancel all queued salvages */
458 res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
460 case SALVSYNC_RAISEPRIO:
461 /* raise the priority of a salvage */
462 res.hdr.response = SALVSYNC_com_RaisePrio(&scom, &sres);
465 /* query whether a volume is done salvaging */
466 res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
468 case SYNC_COM_CHANNEL_CLOSE:
469 res.hdr.response = SYNC_OK;
470 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
473 res.hdr.response = SYNC_BAD_COMMAND;
477 sres_hdr.sq_len = salvageQueue.total_len;
478 sres_hdr.pq_len = pendingQueue.len;
482 SYNC_putRes(fd, &res);
483 if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
489 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
491 afs_int32 code = SYNC_OK;
492 struct SalvageQueueNode * node;
494 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
496 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
500 node = LookupNodeByCommand(com->sop);
502 /* schedule a salvage for this volume */
504 switch (node->state) {
505 case SALVSYNC_STATE_ERROR:
506 case SALVSYNC_STATE_DONE:
507 memcpy(&node->command.com, com->hdr, sizeof(SYNC_command_hdr));
508 memcpy(&node->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
509 node->command.sop.prio = 0;
510 if (AddToSalvageQueue(node)) {
518 node = (struct SalvageQueueNode *) malloc(sizeof(struct SalvageQueueNode));
523 memset(node, 0, sizeof(struct SalvageQueueNode));
524 memcpy(&node->command.com, com->hdr, sizeof(SYNC_command_hdr));
525 memcpy(&node->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
527 if (AddToSalvageQueue(node)) {
529 DeleteNodeFromHash(node);
537 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
538 res->sop->state = node->state;
539 res->sop->prio = node->command.sop.prio;
546 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
548 afs_int32 code = SYNC_OK;
549 struct SalvageQueueNode * node;
551 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
553 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
557 node = LookupNodeByCommand(com->sop);
560 res->sop->state = SALVSYNC_STATE_UNKNOWN;
563 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
564 res->sop->prio = node->command.sop.prio;
565 res->sop->state = node->state;
566 if (node->state == SALVSYNC_STATE_QUEUED) {
567 DeleteFromSalvageQueue(node);
576 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
578 struct SalvageQueueNode * np, *nnp;
579 struct DiskPartition * dp;
581 for (dp = DiskPartitionList ; dp ; dp = dp->next) {
582 for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
583 DeleteFromSalvageQueue(np);
591 SALVSYNC_com_RaisePrio(SALVSYNC_command * com, SALVSYNC_response * res)
593 afs_int32 code = SYNC_OK;
594 struct SalvageQueueNode * node;
596 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
598 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
602 node = LookupNodeByCommand(com->sop);
604 /* raise the priority of a salvage */
606 code = SALVSYNC_com_Salvage(com, res);
607 node = LookupNodeByCommand(com->sop);
609 switch (node->state) {
610 case SALVSYNC_STATE_QUEUED:
611 RaiseCommandPrio(node, com->sop);
613 case SALVSYNC_STATE_SALVAGING:
615 case SALVSYNC_STATE_ERROR:
616 case SALVSYNC_STATE_DONE:
617 code = SALVSYNC_com_Salvage(com, res);
626 res->sop->state = SALVSYNC_STATE_UNKNOWN;
628 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
629 res->sop->prio = node->command.sop.prio;
630 res->sop->state = node->state;
638 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
640 afs_int32 code = SYNC_OK;
641 struct SalvageQueueNode * node;
643 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
645 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
649 node = LookupNodeByCommand(com->sop);
651 /* query whether a volume is done salvaging */
653 res->sop->state = SALVSYNC_STATE_UNKNOWN;
656 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
657 res->sop->state = node->state;
658 res->sop->prio = node->command.sop.prio;
666 SALVSYNC_Drop(int fd)
677 static int AcceptHandler = -1; /* handler id for accept, if turned on */
682 if (AcceptHandler == -1) {
683 assert(AddHandler(AcceptSd, SALVSYNC_newconnection));
684 AcceptHandler = FindHandler(AcceptSd);
691 if (AcceptHandler != -1) {
692 assert(RemoveHandler(AcceptSd));
697 /* The multiple FD handling code. */
699 static int HandlerFD[MAXHANDLERS];
700 static void (*HandlerProc[MAXHANDLERS]) (int);
706 ObtainWriteLock(&SALVSYNC_handler_lock);
707 for (i = 0; i < MAXHANDLERS; i++) {
709 HandlerProc[i] = NULL;
711 ReleaseWriteLock(&SALVSYNC_handler_lock);
715 CallHandler(fd_set * fdsetp)
718 ObtainReadLock(&SALVSYNC_handler_lock);
719 for (i = 0; i < MAXHANDLERS; i++) {
720 if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
721 ReleaseReadLock(&SALVSYNC_handler_lock);
722 (*HandlerProc[i]) (HandlerFD[i]);
723 ObtainReadLock(&SALVSYNC_handler_lock);
726 ReleaseReadLock(&SALVSYNC_handler_lock);
730 AddHandler(int afd, void (*aproc) (int))
733 ObtainWriteLock(&SALVSYNC_handler_lock);
734 for (i = 0; i < MAXHANDLERS; i++)
735 if (HandlerFD[i] == -1)
737 if (i >= MAXHANDLERS) {
738 ReleaseWriteLock(&SALVSYNC_handler_lock);
742 HandlerProc[i] = aproc;
743 ReleaseWriteLock(&SALVSYNC_handler_lock);
748 FindHandler(register int afd)
751 ObtainReadLock(&SALVSYNC_handler_lock);
752 for (i = 0; i < MAXHANDLERS; i++)
753 if (HandlerFD[i] == afd) {
754 ReleaseReadLock(&SALVSYNC_handler_lock);
757 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
759 return -1; /* satisfy compiler */
763 FindHandler_r(register int afd)
766 for (i = 0; i < MAXHANDLERS; i++)
767 if (HandlerFD[i] == afd) {
771 return -1; /* satisfy compiler */
775 RemoveHandler(register int afd)
777 ObtainWriteLock(&SALVSYNC_handler_lock);
778 HandlerFD[FindHandler_r(afd)] = -1;
779 ReleaseWriteLock(&SALVSYNC_handler_lock);
784 GetHandler(fd_set * fdsetp, int *maxfdp)
787 register int maxfd = -1;
789 ObtainReadLock(&SALVSYNC_handler_lock); /* just in case */
790 for (i = 0; i < MAXHANDLERS; i++)
791 if (HandlerFD[i] != -1) {
792 FD_SET(HandlerFD[i], fdsetp);
793 if (maxfd < HandlerFD[i])
794 maxfd = HandlerFD[i];
797 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
801 AddToSalvageQueue(struct SalvageQueueNode * node)
805 id = volutil_GetPartitionID(node->command.sop.partName);
806 if (id < 0 || id > VOLMAXPARTS) {
809 if (!VGetPartitionById_r(id, 0)) {
810 /* don't enqueue salvage requests for unmounted partitions */
813 queue_Append(&salvageQueue.part[id], node);
814 salvageQueue.len[id]++;
815 salvageQueue.total_len++;
816 salvageQueue.last_insert = id;
817 node->partition_id = id;
818 node->state = SALVSYNC_STATE_QUEUED;
819 assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
824 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
826 if (queue_IsOnQueue(node)) {
828 salvageQueue.len[node->partition_id]--;
829 salvageQueue.total_len--;
830 node->state = SALVSYNC_STATE_UNKNOWN;
831 assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
836 AddToPendingQueue(struct SalvageQueueNode * node)
838 queue_Append(&pendingQueue, node);
840 node->state = SALVSYNC_STATE_SALVAGING;
841 assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
845 DeleteFromPendingQueue(struct SalvageQueueNode * node)
847 if (queue_IsOnQueue(node)) {
850 node->state = SALVSYNC_STATE_UNKNOWN;
851 assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
855 static struct SalvageQueueNode *
856 LookupPendingCommand(SALVSYNC_command_hdr * qry)
858 struct SalvageQueueNode * np, * nnp;
860 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
861 if ((np->command.sop.volume == qry->volume) &&
862 !strncmp(np->command.sop.partName, qry->partName,
863 sizeof(qry->partName)))
867 if (queue_IsEnd(&pendingQueue, np))
872 static struct SalvageQueueNode *
873 LookupPendingCommandByPid(int pid)
875 struct SalvageQueueNode * np, * nnp;
877 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
882 if (queue_IsEnd(&pendingQueue, np))
888 /* raise the priority of a previously scheduled salvage */
890 RaiseCommandPrio(struct SalvageQueueNode * node, SALVSYNC_command_hdr * com)
892 struct SalvageQueueNode *np, *nnp;
895 assert(queue_IsOnQueue(node));
897 node->command.sop.prio = com->prio;
898 id = node->partition_id;
899 if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < com->prio) {
901 queue_Prepend(&salvageQueue.part[id], node);
903 for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
904 if (np->command.sop.prio > com->prio)
907 if (queue_IsEnd(&salvageQueue.part[id], np)) {
909 queue_Prepend(&salvageQueue.part[id], node);
910 } else if (node != np) {
912 queue_InsertAfter(np, node);
917 /* this will need to be rearchitected if we ever want more than one thread
918 * to wait for new salvage nodes */
919 struct SalvageQueueNode *
920 SALVSYNC_getWork(void)
923 struct DiskPartition * dp = NULL, * fdp;
924 static afs_int32 next_part_sched = 0;
925 struct SalvageQueueNode *node = NULL, *np;
930 * wait for work to be scheduled
931 * if there are no disk partitions, just sit in this wait loop forever
933 while (!salvageQueue.total_len || !DiskPartitionList) {
934 assert(pthread_cond_wait(&salvageQueue.cv, &vol_glock_mutex) == 0);
939 * short circuit for simple case where only one partition has
942 if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
943 (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
944 node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
950 * ok, more than one partition has scheduled salvages.
951 * now search for partitions with scheduled salvages, but no pending salvages.
953 dp = VGetPartitionById_r(next_part_sched, 0);
955 dp = DiskPartitionList;
961 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
962 if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
963 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
970 * all partitions with scheduled salvages have at least one pending.
971 * now do an exhaustive search for a scheduled salvage.
977 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
978 if (salvageQueue.len[dp->index]) {
979 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
984 /* we should never reach this line */
988 assert(node != NULL);
990 partition_salvaging[node->partition_id]++;
991 DeleteFromSalvageQueue(node);
992 AddToPendingQueue(node);
995 /* update next_part_sched field */
997 next_part_sched = dp->next->index;
998 } else if (DiskPartitionList) {
999 next_part_sched = DiskPartitionList->index;
1001 next_part_sched = -1;
1011 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1014 DeleteFromPendingQueue(node);
1015 partid = node->partition_id;
1016 if (partid >=0 && partid <= VOLMAXPARTS) {
1017 partition_salvaging[partid]--;
1020 node->state = SALVSYNC_STATE_DONE;
1022 node->state = SALVSYNC_STATE_ERROR;
1027 SALVSYNC_doneWork(struct SalvageQueueNode * node, int result)
1030 SALVSYNC_doneWork_r(node, result);
1035 SALVSYNC_doneWorkByPid(int pid, int result)
1037 struct SalvageQueueNode * node;
1040 node = LookupPendingCommandByPid(pid);
1042 SALVSYNC_doneWork_r(node, result);
1047 #endif /* AFS_DEMAND_ATTACH_FS */