8eb879f8c459f3e6936faf77da5cabbbe0cde742
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29 #include <afs/procmgmt.h>
30 #include <roken.h>
31
32 #include <stddef.h>
33
34 #include <afs/opr.h>
35 #include <opr/lock.h>
36 #include <afs/afsint.h>
37 #include <rx/rx_queue.h>
38
39 #include "nfs.h"
40 #include <afs/errors.h>
41 #include "salvsync.h"
42 #include "lock.h"
43 #include <afs/afssyscalls.h>
44 #include "ihandle.h"
45 #include "vnode.h"
46 #include "volume.h"
47 #include "partition.h"
48 #include "common.h"
49 #include <rx/rx_queue.h>
50
51 #ifdef USE_UNIX_SOCKETS
52 #include <afs/afsutil.h>
53 #include <sys/un.h>
54 #endif
55
56 #ifndef WCOREDUMP
57 #define WCOREDUMP(x)    ((x) & 0200)
58 #endif
59
60 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
61                                  * move = dump+restore can run on single server */
62
63
64 /*
65  * This lock controls access to the handler array.
66  */
67 struct Lock SALVSYNC_handler_lock;
68
69
70 #ifdef AFS_DEMAND_ATTACH_FS
71 /*
72  * SALVSYNC is a feature specific to the demand attach fileserver
73  */
74
75 /* Forward declarations */
76 static void * SALVSYNC_syncThread(void *);
77 static void SALVSYNC_newconnection(osi_socket fd);
78 static void SALVSYNC_com(osi_socket fd);
79 static void SALVSYNC_Drop(osi_socket fd);
80 static void AcceptOn(void);
81 static void AcceptOff(void);
82 static void InitHandler(void);
83 static void CallHandler(fd_set * fdsetp);
84 static int AddHandler(osi_socket afd, void (*aproc) (int));
85 static int FindHandler(osi_socket afd);
86 static int FindHandler_r(osi_socket afd);
87 static int RemoveHandler(osi_socket afd);
88 static void GetHandler(fd_set * fdsetp, int *maxfdp);
89
90 static int AllocNode(struct SalvageQueueNode ** node);
91
92 static int AddToSalvageQueue(struct SalvageQueueNode * node);
93 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
94 static void AddToPendingQueue(struct SalvageQueueNode * node);
95 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
96 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
97 static void UpdateCommandPrio(struct SalvageQueueNode * node);
98 static void HandlePrio(struct SalvageQueueNode * clone,
99                        struct SalvageQueueNode * parent,
100                        afs_uint32 new_prio);
101
102 static int LinkNode(struct SalvageQueueNode * parent,
103                     struct SalvageQueueNode * clone);
104
105 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
106                                             struct SalvageQueueNode ** parent);
107 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
108                                                      struct SalvageQueueNode ** parent);
109 static void AddNodeToHash(struct SalvageQueueNode * node);
110
111 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
112 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
113 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
114 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
115 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
116
117
118 extern int VInit;
119 extern pthread_mutex_t vol_salvsync_mutex;
120
121 /**
122  * salvsync server socket handle.
123  */
124 static SYNC_server_state_t salvsync_server_state =
125     { OSI_NULLSOCKET,                       /* file descriptor */
126       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
127       SALVSYNC_PROTO_VERSION,   /* protocol version */
128       5,                        /* bind() retry limit */
129       100,                      /* listen() queue depth */
130       "SALVSYNC",               /* protocol name string */
131     };
132
133
134 /**
135  * queue of all volumes waiting to be salvaged.
136  */
137 struct SalvageQueue {
138     volatile int total_len;
139     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
140     volatile int len[VOLMAXPARTS+1];
141     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
142     pthread_cond_t cv;
143 };
144 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
145
146 /**
147  * queue of all volumes currently being salvaged.
148  */
149 struct QueueHead {
150     volatile struct rx_queue q;  /**< queue of salvages in progress */
151     volatile int len;            /**< length of in-progress queue */
152     pthread_cond_t queue_change_cv;
153 };
154 static struct QueueHead pendingQueue;  /* volumes being salvaged */
155
156 /* XXX
157  * whether a partition has a salvage in progress
158  *
159  * the salvager code only permits one salvage per partition at a time
160  *
161  * the following hack tries to keep salvaged parallelism high by
162  * only permitting one salvage dispatch per partition at a time
163  *
164  * unfortunately, the parallel salvager currently
165  * has a rather braindead routine that won't permit
166  * multiple salvages on the same "device".  this
167  * function happens to break pretty badly on lvm, raid luns, etc.
168  *
169  * this hack isn't good enough to stop the device limiting code from
170  * crippling performance.  someday that code needs to be rewritten
171  */
172 static int partition_salvaging[VOLMAXPARTS+1];
173
174 static int HandlerFD[MAXHANDLERS];
175 static void (*HandlerProc[MAXHANDLERS]) (int);
176
177 #define VSHASH_SIZE 64
178 #define VSHASH_MASK (VSHASH_SIZE-1)
179 #define VSHASH(vid) ((vid)&VSHASH_MASK)
180
181 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
182
183 static struct SalvageQueueNode *
184 LookupNode(VolumeId vid, char * partName,
185            struct SalvageQueueNode ** parent)
186 {
187     struct rx_queue *qp, *nqp;
188     struct SalvageQueueNode *vsp = NULL;
189     int idx = VSHASH(vid);
190
191     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
192         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
193         if ((vsp->command.sop.volume == vid) &&
194             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
195             break;
196         }
197     }
198
199     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
200         vsp = NULL;
201     }
202
203     if (parent) {
204         if (vsp) {
205             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
206                 vsp->volgroup.parent : vsp;
207         } else {
208             *parent = NULL;
209         }
210     }
211
212     return vsp;
213 }
214
215 static struct SalvageQueueNode *
216 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
217                     struct SalvageQueueNode ** parent)
218 {
219     return LookupNode(qry->volume, qry->partName, parent);
220 }
221
222 static void
223 AddNodeToHash(struct SalvageQueueNode * node)
224 {
225     int idx = VSHASH(node->command.sop.volume);
226
227     if (queue_IsOnQueue(&node->hash_chain)) {
228         return;
229     }
230
231     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
232     SalvageHashTable[idx].len++;
233 }
234
235 void
236 SALVSYNC_salvInit(void)
237 {
238     int i;
239     pthread_t tid;
240     pthread_attr_t tattr;
241
242     /* initialize the queues */
243     Lock_Init(&SALVSYNC_handler_lock);
244     CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
245     for (i = 0; i <= VOLMAXPARTS; i++) {
246         queue_Init(&salvageQueue.part[i]);
247         salvageQueue.len[i] = 0;
248     }
249     CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
250     queue_Init(&pendingQueue);
251     salvageQueue.total_len = pendingQueue.len = 0;
252     salvageQueue.last_insert = -1;
253     memset(partition_salvaging, 0, sizeof(partition_salvaging));
254
255     for (i = 0; i < VSHASH_SIZE; i++) {
256         CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
257         SalvageHashTable[i].len = 0;
258         queue_Init(&SalvageHashTable[i]);
259     }
260
261     /* start the salvsync thread */
262     opr_Verify(pthread_attr_init(&tattr) == 0);
263     opr_Verify(pthread_attr_setdetachstate(&tattr,
264                                            PTHREAD_CREATE_DETACHED) == 0);
265     opr_Verify(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
266 }
267
268 static void
269 CleanFDs(void)
270 {
271     int i;
272     for (i = 0; i < MAXHANDLERS; ++i) {
273         if (HandlerFD[i] >= 0) {
274             SALVSYNC_Drop(HandlerFD[i]);
275         }
276     }
277
278     /* just in case we were in AcceptOff mode, and thus this fd wouldn't
279      * have a handler */
280     close(salvsync_server_state.fd);
281     salvsync_server_state.fd = OSI_NULLSOCKET;
282 }
283
284 static fd_set SALVSYNC_readfds;
285
286 static void *
287 SALVSYNC_syncThread(void * args)
288 {
289     int code;
290     SYNC_server_state_t * state = &salvsync_server_state;
291
292     /* when we fork, the child needs to close the salvsync server sockets,
293      * otherwise, it may get salvsync requests, instead of the parent
294      * salvageserver */
295     opr_Verify(pthread_atfork(NULL, NULL, CleanFDs) == 0);
296
297     SYNC_getAddr(&state->endpoint, &state->addr);
298     SYNC_cleanupSock(state);
299
300 #ifndef AFS_NT40_ENV
301     (void)signal(SIGPIPE, SIG_IGN);
302 #endif
303
304     state->fd = SYNC_getSock(&state->endpoint);
305     code = SYNC_bindSock(state);
306     opr_Assert(!code);
307
308     InitHandler();
309     AcceptOn();
310
311     for (;;) {
312         int maxfd;
313         struct timeval s_timeout;
314         GetHandler(&SALVSYNC_readfds, &maxfd);
315         s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
316         s_timeout.tv_usec = 0;
317         /* Note: check for >= 1 below is essential since IOMGR_select
318          * doesn't have exactly same semantics as select.
319          */
320         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
321             CallHandler(&SALVSYNC_readfds);
322     }
323
324     AFS_UNREACHED(return(NULL));
325 }
326
327 static void
328 SALVSYNC_newconnection(int afd)
329 {
330 #ifdef USE_UNIX_SOCKETS
331     struct sockaddr_un other;
332 #else  /* USE_UNIX_SOCKETS */
333     struct sockaddr_in other;
334 #endif
335     int fd;
336     socklen_t junk;
337
338     junk = sizeof(other);
339     fd = accept(afd, (struct sockaddr *)&other, &junk);
340     if (fd == OSI_NULLSOCKET) {
341         osi_Panic("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
342     } else if (!AddHandler(fd, SALVSYNC_com)) {
343         AcceptOff();
344         opr_Verify(AddHandler(fd, SALVSYNC_com));
345     }
346 }
347
348 /* this function processes commands from an salvsync file descriptor (fd) */
349 static afs_int32 SALV_cnt = 0;
350 static void
351 SALVSYNC_com(osi_socket fd)
352 {
353     SYNC_command com;
354     SYNC_response res;
355     SALVSYNC_response_hdr sres_hdr;
356     SALVSYNC_command scom;
357     SALVSYNC_response sres;
358     SYNC_PROTO_BUF_DECL(buf);
359
360     memset(&com, 0, sizeof(com));
361     memset(&res, 0, sizeof(res));
362     memset(&scom, 0, sizeof(scom));
363     memset(&sres, 0, sizeof(sres));
364     memset(&sres_hdr, 0, sizeof(sres_hdr));
365
366     com.payload.buf = (void *)buf;
367     com.payload.len = SYNC_PROTO_MAX_LEN;
368     res.payload.buf = (void *) &sres_hdr;
369     res.payload.len = sizeof(sres_hdr);
370     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
371     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
372
373     scom.hdr = &com.hdr;
374     scom.sop = (SALVSYNC_command_hdr *) buf;
375     scom.com = &com;
376     sres.hdr = &res.hdr;
377     sres.sop = &sres_hdr;
378     sres.res = &res;
379
380     SALV_cnt++;
381     if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
382         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
383         SALVSYNC_Drop(fd);
384         return;
385     }
386
387     if (com.recv_len < sizeof(com.hdr)) {
388         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
389         res.hdr.response = SYNC_COM_ERROR;
390         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
391         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
392         goto respond;
393     }
394
395     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
396         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
397         res.hdr.response = SYNC_COM_ERROR;
398         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
399         goto respond;
400     }
401
402     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
403         res.hdr.response = SYNC_OK;
404         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
405
406         /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
407          * never wait for a response. */
408         goto done;
409     }
410
411     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
412         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
413         res.hdr.response = SYNC_COM_ERROR;
414         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
415         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
416         goto respond;
417     }
418
419     res.hdr.com_seq = com.hdr.com_seq;
420
421     VOL_LOCK;
422     switch (com.hdr.command) {
423     case SALVSYNC_NOP:
424         break;
425     case SALVSYNC_SALVAGE:
426     case SALVSYNC_RAISEPRIO:
427         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
428         break;
429     case SALVSYNC_CANCEL:
430         /* cancel a salvage */
431         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
432         break;
433     case SALVSYNC_CANCELALL:
434         /* cancel all queued salvages */
435         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
436         break;
437     case SALVSYNC_QUERY:
438         /* query whether a volume is done salvaging */
439         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
440         break;
441     case SALVSYNC_OP_LINK:
442         /* link a clone to its parent in the scheduler */
443         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
444         break;
445     default:
446         res.hdr.response = SYNC_BAD_COMMAND;
447         break;
448     }
449
450     sres_hdr.sq_len = salvageQueue.total_len;
451     sres_hdr.pq_len = pendingQueue.len;
452     VOL_UNLOCK;
453
454  respond:
455     SYNC_putRes(&salvsync_server_state, fd, &res);
456
457  done:
458     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
459         SALVSYNC_Drop(fd);
460     }
461 }
462
463 /**
464  * request that a volume be salvaged.
465  *
466  * @param[in]  com  inbound command object
467  * @param[out] res  outbound response object
468  *
469  * @return operation status
470  *    @retval SYNC_OK success
471  *    @retval SYNC_DENIED failed to enqueue request
472  *    @retval SYNC_FAILED malformed command packet
473  *
474  * @note this is a SALVSYNC protocol rpc handler
475  *
476  * @internal
477  *
478  * @post the volume is enqueued in the to-be-salvaged queue.
479  *       if the volume was already in the salvage queue, its
480  *       priority (and thus its location in the queue) are
481  *       updated.
482  */
483 static afs_int32
484 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
485 {
486     afs_int32 code = SYNC_OK;
487     struct SalvageQueueNode * node, * clone;
488     int hash = 0;
489
490     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
491         code = SYNC_FAILED;
492         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
493         goto done;
494     }
495
496     clone = LookupNodeByCommand(com->sop, &node);
497
498     if (node == NULL) {
499         if (AllocNode(&node)) {
500             code = SYNC_DENIED;
501             res->hdr->reason = SYNC_REASON_NOMEM;
502             goto done;
503         }
504         clone = node;
505         hash = 1;
506     }
507
508     HandlePrio(clone, node, com->sop->prio);
509
510     switch (node->state) {
511     case SALVSYNC_STATE_QUEUED:
512         UpdateCommandPrio(node);
513         break;
514
515     case SALVSYNC_STATE_ERROR:
516     case SALVSYNC_STATE_DONE:
517     case SALVSYNC_STATE_UNKNOWN:
518         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
519         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
520
521         /*
522          * make sure volgroup parent partition path is kept coherent
523          *
524          * If we ever want to support non-COW clones on a machine holding
525          * the RW site, please note that this code does not work under the
526          * conditions where someone zaps a COW clone on partition X, and
527          * subsequently creates a full clone on partition Y -- we'd need
528          * an inverse to SALVSYNC_com_Link.
529          *  -- tkeiser 11/28/2007
530          */
531         strcpy(node->command.sop.partName, com->sop->partName);
532
533         if (AddToSalvageQueue(node)) {
534             code = SYNC_DENIED;
535         }
536         break;
537
538     default:
539         break;
540     }
541
542     if (hash) {
543         AddNodeToHash(node);
544     }
545
546     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
547     res->sop->state = node->state;
548     res->sop->prio = node->command.sop.prio;
549
550  done:
551     return code;
552 }
553
554 /**
555  * cancel a pending salvage request.
556  *
557  * @param[in]  com  inbound command object
558  * @param[out] res  outbound response object
559  *
560  * @return operation status
561  *    @retval SYNC_OK success
562  *    @retval SYNC_FAILED malformed command packet
563  *
564  * @note this is a SALVSYNC protocol rpc handler
565  *
566  * @internal
567  */
568 static afs_int32
569 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
570 {
571     afs_int32 code = SYNC_OK;
572     struct SalvageQueueNode * node;
573
574     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
575         code = SYNC_FAILED;
576         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
577         goto done;
578     }
579
580     node = LookupNodeByCommand(com->sop, NULL);
581
582     if (node == NULL) {
583         res->sop->state = SALVSYNC_STATE_UNKNOWN;
584         res->sop->prio = 0;
585     } else {
586         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
587         res->sop->prio = node->command.sop.prio;
588         res->sop->state = node->state;
589         if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
590             (node->state == SALVSYNC_STATE_QUEUED)) {
591             DeleteFromSalvageQueue(node);
592         }
593     }
594
595  done:
596     return code;
597 }
598
599 /**
600  * cancel all pending salvage requests.
601  *
602  * @param[in]  com  incoming command object
603  * @param[out] res  outbound response object
604  *
605  * @return operation status
606  *    @retval SYNC_OK success
607  *
608  * @note this is a SALVSYNC protocol rpc handler
609  *
610  * @internal
611  */
612 static afs_int32
613 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
614 {
615     struct SalvageQueueNode * np, *nnp;
616     struct DiskPartition64 * dp;
617
618     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
619         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
620             DeleteFromSalvageQueue(np);
621         }
622     }
623
624     return SYNC_OK;
625 }
626
627 /**
628  * link a queue node for a clone to its parent volume.
629  *
630  * @param[in]  com   inbound command object
631  * @param[out] res   outbound response object
632  *
633  * @return operation status
634  *    @retval SYNC_OK success
635  *    @retval SYNC_FAILED malformed command packet
636  *    @retval SYNC_DENIED the request could not be completed
637  *
638  * @note this is a SALVSYNC protocol rpc handler
639  *
640  * @post the requested volume is marked as a child of another volume.
641  *       thus, future salvage requests for this volume will result in the
642  *       parent of the volume group being scheduled for salvage instead
643  *       of this clone.
644  *
645  * @internal
646  */
647 static afs_int32
648 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
649 {
650     afs_int32 code = SYNC_OK;
651     struct SalvageQueueNode * clone, * parent;
652
653     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
654         code = SYNC_FAILED;
655         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
656         goto done;
657     }
658
659     /* lookup clone's salvage scheduling node */
660     clone = LookupNodeByCommand(com->sop, NULL);
661     if (clone == NULL) {
662         code = SYNC_DENIED;
663         res->hdr->reason = SALVSYNC_REASON_ERROR;
664         goto done;
665     }
666
667     /* lookup parent's salvage scheduling node */
668     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
669     if (parent == NULL) {
670         if (AllocNode(&parent)) {
671             code = SYNC_DENIED;
672             res->hdr->reason = SYNC_REASON_NOMEM;
673             goto done;
674         }
675         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
676         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
677         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
678         AddNodeToHash(parent);
679     }
680
681     if (LinkNode(parent, clone)) {
682         code = SYNC_DENIED;
683         goto done;
684     }
685
686  done:
687     return code;
688 }
689
690 /**
691  * query the status of a volume salvage request.
692  *
693  * @param[in]  com   inbound command object
694  * @param[out] res   outbound response object
695  *
696  * @return operation status
697  *    @retval SYNC_OK success
698  *    @retval SYNC_FAILED malformed command packet
699  *
700  * @note this is a SALVSYNC protocol rpc handler
701  *
702  * @internal
703  */
704 static afs_int32
705 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
706 {
707     afs_int32 code = SYNC_OK;
708     struct SalvageQueueNode * node;
709
710     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
711         code = SYNC_FAILED;
712         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
713         goto done;
714     }
715
716     LookupNodeByCommand(com->sop, &node);
717
718     /* query whether a volume is done salvaging */
719     if (node == NULL) {
720         res->sop->state = SALVSYNC_STATE_UNKNOWN;
721         res->sop->prio = 0;
722     } else {
723         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
724         res->sop->state = node->state;
725         res->sop->prio = node->command.sop.prio;
726     }
727
728  done:
729     return code;
730 }
731
732 static void
733 SALVSYNC_Drop(osi_socket fd)
734 {
735     RemoveHandler(fd);
736     rk_closesocket(fd);
737     AcceptOn();
738 }
739
740 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
741
742 static void
743 AcceptOn(void)
744 {
745     if (AcceptHandler == -1) {
746         opr_Verify(AddHandler(salvsync_server_state.fd,
747                               SALVSYNC_newconnection));
748         AcceptHandler = FindHandler(salvsync_server_state.fd);
749     }
750 }
751
752 static void
753 AcceptOff(void)
754 {
755     if (AcceptHandler != -1) {
756         opr_Verify(RemoveHandler(salvsync_server_state.fd));
757         AcceptHandler = -1;
758     }
759 }
760
761 /* The multiple FD handling code. */
762
763 static void
764 InitHandler(void)
765 {
766     int i;
767     ObtainWriteLock(&SALVSYNC_handler_lock);
768     for (i = 0; i < MAXHANDLERS; i++) {
769         HandlerFD[i] = OSI_NULLSOCKET;
770         HandlerProc[i] = NULL;
771     }
772     ReleaseWriteLock(&SALVSYNC_handler_lock);
773 }
774
775 static void
776 CallHandler(fd_set * fdsetp)
777 {
778     int i;
779     ObtainReadLock(&SALVSYNC_handler_lock);
780     for (i = 0; i < MAXHANDLERS; i++) {
781         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
782             ReleaseReadLock(&SALVSYNC_handler_lock);
783             (*HandlerProc[i]) (HandlerFD[i]);
784             ObtainReadLock(&SALVSYNC_handler_lock);
785         }
786     }
787     ReleaseReadLock(&SALVSYNC_handler_lock);
788 }
789
790 static int
791 AddHandler(osi_socket afd, void (*aproc) (int))
792 {
793     int i;
794     ObtainWriteLock(&SALVSYNC_handler_lock);
795     for (i = 0; i < MAXHANDLERS; i++)
796         if (HandlerFD[i] == OSI_NULLSOCKET)
797             break;
798     if (i >= MAXHANDLERS) {
799         ReleaseWriteLock(&SALVSYNC_handler_lock);
800         return 0;
801     }
802     HandlerFD[i] = afd;
803     HandlerProc[i] = aproc;
804     ReleaseWriteLock(&SALVSYNC_handler_lock);
805     return 1;
806 }
807
808 static int
809 FindHandler(osi_socket afd)
810 {
811     int i;
812     ObtainReadLock(&SALVSYNC_handler_lock);
813     for (i = 0; i < MAXHANDLERS; i++)
814         if (HandlerFD[i] == afd) {
815             ReleaseReadLock(&SALVSYNC_handler_lock);
816             return i;
817         }
818     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
819     osi_Panic("Failed to find handler\n");
820     AFS_UNREACHED(return -1);
821 }
822
823 static int
824 FindHandler_r(osi_socket afd)
825 {
826     int i;
827     for (i = 0; i < MAXHANDLERS; i++)
828         if (HandlerFD[i] == afd) {
829             return i;
830         }
831     osi_Panic("Failed to find handler\n");
832     AFS_UNREACHED(return -1);
833 }
834
835 static int
836 RemoveHandler(osi_socket afd)
837 {
838     ObtainWriteLock(&SALVSYNC_handler_lock);
839     HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
840     ReleaseWriteLock(&SALVSYNC_handler_lock);
841     return 1;
842 }
843
844 static void
845 GetHandler(fd_set * fdsetp, int *maxfdp)
846 {
847     int i;
848     int maxfd = -1;
849     FD_ZERO(fdsetp);
850     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
851     for (i = 0; i < MAXHANDLERS; i++)
852         if (HandlerFD[i] != OSI_NULLSOCKET) {
853             FD_SET(HandlerFD[i], fdsetp);
854 #ifndef AFS_NT40_ENV
855             /* On Windows the nfds parameter to select() is ignored */
856             if (maxfd < HandlerFD[i] || maxfd == (int)-1)
857                 maxfd = HandlerFD[i];
858 #endif
859         }
860     *maxfdp = maxfd;
861     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
862 }
863
864 /**
865  * allocate a salvage queue node.
866  *
867  * @param[out] node_out  address in which to store new node pointer
868  *
869  * @return operation status
870  *    @retval 0 success
871  *    @retval 1 failed to allocate node
872  *
873  * @internal
874  */
875 static int
876 AllocNode(struct SalvageQueueNode ** node_out)
877 {
878     int code = 0;
879     struct SalvageQueueNode * node;
880
881     *node_out = node = calloc(1, sizeof(struct SalvageQueueNode));
882     if (node == NULL) {
883         code = 1;
884         goto done;
885     }
886
887     node->type = SALVSYNC_VOLGROUP_PARENT;
888     node->state = SALVSYNC_STATE_UNKNOWN;
889
890  done:
891     return code;
892 }
893
894 /**
895  * link a salvage queue node to its parent.
896  *
897  * @param[in] parent  pointer to queue node for parent of volume group
898  * @param[in] clone   pointer to queue node for a clone
899  *
900  * @return operation status
901  *    @retval 0 success
902  *    @retval 1 failure
903  *
904  * @internal
905  */
906 static int
907 LinkNode(struct SalvageQueueNode * parent,
908          struct SalvageQueueNode * clone)
909 {
910     int code = 0;
911     int idx;
912
913     /* check for attaching a clone to a clone */
914     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
915         code = 1;
916         goto done;
917     }
918
919     /* check for pre-existing registration and openings */
920     for (idx = 0; idx < VOLMAXTYPES; idx++) {
921         if (parent->volgroup.children[idx] == clone) {
922             goto linked;
923         }
924         if (parent->volgroup.children[idx] == NULL) {
925             break;
926         }
927     }
928     if (idx == VOLMAXTYPES) {
929         code = 1;
930         goto done;
931     }
932
933     /* link parent and child */
934     parent->volgroup.children[idx] = clone;
935     clone->type = SALVSYNC_VOLGROUP_CLONE;
936     clone->volgroup.parent = parent;
937
938
939  linked:
940     switch (clone->state) {
941     case SALVSYNC_STATE_QUEUED:
942         DeleteFromSalvageQueue(clone);
943         /* fall through */
944     case SALVSYNC_STATE_SALVAGING:
945         switch (parent->state) {
946         case SALVSYNC_STATE_UNKNOWN:
947         case SALVSYNC_STATE_ERROR:
948         case SALVSYNC_STATE_DONE:
949             parent->command.sop.prio = clone->command.sop.prio;
950             AddToSalvageQueue(parent);
951             break;
952
953         case SALVSYNC_STATE_QUEUED:
954             if (clone->command.sop.prio) {
955                 parent->command.sop.prio += clone->command.sop.prio;
956                 UpdateCommandPrio(parent);
957             }
958             break;
959
960         default:
961             break;
962         }
963         break;
964
965     default:
966         break;
967     }
968
969  done:
970     return code;
971 }
972
973 static void
974 HandlePrio(struct SalvageQueueNode * clone,
975            struct SalvageQueueNode * node,
976            afs_uint32 new_prio)
977 {
978     afs_uint32 delta;
979
980     switch (node->state) {
981     case SALVSYNC_STATE_ERROR:
982     case SALVSYNC_STATE_DONE:
983     case SALVSYNC_STATE_UNKNOWN:
984         node->command.sop.prio = 0;
985         break;
986     default:
987         break;
988     }
989
990     if (new_prio < clone->command.sop.prio) {
991         /* strange. let's just set our delta to 1 */
992         delta = 1;
993     } else {
994         delta = new_prio - clone->command.sop.prio;
995     }
996
997     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
998         clone->command.sop.prio = new_prio;
999     }
1000
1001     node->command.sop.prio += delta;
1002 }
1003
1004 static int
1005 AddToSalvageQueue(struct SalvageQueueNode * node)
1006 {
1007     afs_int32 id;
1008     struct SalvageQueueNode * last = NULL;
1009
1010     id = volutil_GetPartitionID(node->command.sop.partName);
1011     if (id < 0 || id > VOLMAXPARTS) {
1012         return 1;
1013     }
1014     if (!VGetPartitionById_r(id, 0)) {
1015         /* don't enqueue salvage requests for unmounted partitions */
1016         return 1;
1017     }
1018     if (queue_IsOnQueue(node)) {
1019         return 0;
1020     }
1021
1022     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1023         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1024     }
1025     queue_Append(&salvageQueue.part[id], node);
1026     salvageQueue.len[id]++;
1027     salvageQueue.total_len++;
1028     salvageQueue.last_insert = id;
1029     node->partition_id = id;
1030     node->state = SALVSYNC_STATE_QUEUED;
1031
1032     /* reorder, if necessary */
1033     if (last && last->command.sop.prio < node->command.sop.prio) {
1034         UpdateCommandPrio(node);
1035     }
1036
1037     CV_BROADCAST(&salvageQueue.cv);
1038     return 0;
1039 }
1040
1041 static void
1042 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1043 {
1044     if (queue_IsOnQueue(node)) {
1045         queue_Remove(node);
1046         salvageQueue.len[node->partition_id]--;
1047         salvageQueue.total_len--;
1048         node->state = SALVSYNC_STATE_UNKNOWN;
1049         CV_BROADCAST(&salvageQueue.cv);
1050     }
1051 }
1052
1053 static void
1054 AddToPendingQueue(struct SalvageQueueNode * node)
1055 {
1056     queue_Append(&pendingQueue, node);
1057     pendingQueue.len++;
1058     node->state = SALVSYNC_STATE_SALVAGING;
1059     CV_BROADCAST(&pendingQueue.queue_change_cv);
1060 }
1061
1062 static void
1063 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1064 {
1065     if (queue_IsOnQueue(node)) {
1066         queue_Remove(node);
1067         pendingQueue.len--;
1068         node->state = SALVSYNC_STATE_UNKNOWN;
1069         CV_BROADCAST(&pendingQueue.queue_change_cv);
1070     }
1071 }
1072
1073 static struct SalvageQueueNode *
1074 LookupPendingCommandByPid(int pid)
1075 {
1076     struct SalvageQueueNode * np, * nnp;
1077
1078     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1079         if (np->pid == pid)
1080             break;
1081     }
1082
1083     if (queue_IsEnd(&pendingQueue, np))
1084         np = NULL;
1085     return np;
1086 }
1087
1088
1089 /* raise the priority of a previously scheduled salvage */
1090 static void
1091 UpdateCommandPrio(struct SalvageQueueNode * node)
1092 {
1093     struct SalvageQueueNode *np, *nnp;
1094     afs_int32 id;
1095     afs_uint32 prio;
1096
1097     opr_Assert(queue_IsOnQueue(node));
1098
1099     prio = node->command.sop.prio;
1100     id = node->partition_id;
1101     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1102         queue_Remove(node);
1103         queue_Prepend(&salvageQueue.part[id], node);
1104     } else {
1105         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1106             if (np->command.sop.prio > prio)
1107                 break;
1108         }
1109         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1110             queue_Remove(node);
1111             queue_Prepend(&salvageQueue.part[id], node);
1112         } else if (node != np) {
1113             queue_Remove(node);
1114             queue_InsertAfter(np, node);
1115         }
1116     }
1117 }
1118
1119 /* this will need to be rearchitected if we ever want more than one thread
1120  * to wait for new salvage nodes */
1121 struct SalvageQueueNode *
1122 SALVSYNC_getWork(void)
1123 {
1124     int i;
1125     struct DiskPartition64 * dp = NULL, * fdp;
1126     static afs_int32 next_part_sched = 0;
1127     struct SalvageQueueNode *node = NULL;
1128
1129     VOL_LOCK;
1130
1131     /*
1132      * wait for work to be scheduled
1133      * if there are no disk partitions, just sit in this wait loop forever
1134      */
1135     while (!salvageQueue.total_len || !DiskPartitionList) {
1136         VOL_CV_WAIT(&salvageQueue.cv);
1137     }
1138
1139     /*
1140      * short circuit for simple case where only one partition has
1141      * scheduled salvages
1142      */
1143     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1144         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1145         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1146         goto have_node;
1147     }
1148
1149
1150     /*
1151      * ok, more than one partition has scheduled salvages.
1152      * now search for partitions with scheduled salvages, but no pending salvages.
1153      */
1154     dp = VGetPartitionById_r(next_part_sched, 0);
1155     if (!dp) {
1156         dp = DiskPartitionList;
1157     }
1158     fdp = dp;
1159
1160     for (i=0 ;
1161          !i || dp != fdp ;
1162          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1163         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1164             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1165             goto have_node;
1166         }
1167     }
1168
1169
1170     /*
1171      * all partitions with scheduled salvages have at least one pending.
1172      * now do an exhaustive search for a scheduled salvage.
1173      */
1174     dp = fdp;
1175
1176     for (i=0 ;
1177          !i || dp != fdp ;
1178          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1179         if (salvageQueue.len[dp->index]) {
1180             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1181             goto have_node;
1182         }
1183     }
1184
1185     /* we should never reach this line */
1186     osi_Panic("Node not found\n");
1187
1188  have_node:
1189     opr_Assert(node != NULL);
1190     node->pid = 0;
1191     partition_salvaging[node->partition_id]++;
1192     DeleteFromSalvageQueue(node);
1193     AddToPendingQueue(node);
1194
1195     if (dp) {
1196         /* update next_part_sched field */
1197         if (dp->next) {
1198             next_part_sched = dp->next->index;
1199         } else if (DiskPartitionList) {
1200             next_part_sched = DiskPartitionList->index;
1201         } else {
1202             next_part_sched = -1;
1203         }
1204     }
1205
1206     VOL_UNLOCK;
1207     return node;
1208 }
1209
1210 /**
1211  * update internal scheduler state to reflect completion of a work unit.
1212  *
1213  * @param[in]  node    salvage queue node object pointer
1214  * @param[in]  result  worker process result code
1215  *
1216  * @post scheduler state is updated.
1217  *
1218  * @internal
1219  */
1220 static void
1221 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1222 {
1223     afs_int32 partid;
1224     int idx;
1225
1226     DeleteFromPendingQueue(node);
1227     partid = node->partition_id;
1228     if (partid >=0 && partid <= VOLMAXPARTS) {
1229         partition_salvaging[partid]--;
1230     }
1231     if (result == 0) {
1232         node->state = SALVSYNC_STATE_DONE;
1233     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1234         node->state = SALVSYNC_STATE_ERROR;
1235     }
1236
1237     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1238         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1239             if (node->volgroup.children[idx]) {
1240                 node->volgroup.children[idx]->state = node->state;
1241             }
1242         }
1243     }
1244 }
1245
1246 /**
1247  * check whether worker child failed.
1248  *
1249  * @param[in] status  status bitfield return by wait()
1250  *
1251  * @return boolean failure code
1252  *    @retval 0 child succeeded
1253  *    @retval 1 child failed
1254  *
1255  * @internal
1256  */
1257 static int
1258 ChildFailed(int status)
1259 {
1260     return (WCOREDUMP(status) ||
1261             WIFSIGNALED(status) ||
1262             ((WEXITSTATUS(status) != 0) &&
1263              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1264 }
1265
1266
1267 /**
1268  * notify salvsync scheduler of node completion, by child pid.
1269  *
1270  * @param[in]  pid     pid of worker child
1271  * @param[in]  status  worker status bitfield from wait()
1272  *
1273  * @post scheduler state is updated.
1274  *       if status code is a failure, fileserver notification was attempted
1275  *
1276  * @see SALVSYNC_doneWork_r
1277  */
1278 void
1279 SALVSYNC_doneWorkByPid(int pid, int status)
1280 {
1281     struct SalvageQueueNode * node;
1282     char partName[16];
1283     VolumeId volids[VOLMAXTYPES+1];
1284     unsigned int idx;
1285
1286     memset(volids, 0, sizeof(volids));
1287
1288     VOL_LOCK;
1289     node = LookupPendingCommandByPid(pid);
1290     if (node != NULL) {
1291         SALVSYNC_doneWork_r(node, status);
1292
1293         if (ChildFailed(status)) {
1294             /* populate volume id list for later processing outside the glock */
1295             volids[0] = node->command.sop.volume;
1296             strcpy(partName, node->command.sop.partName);
1297             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1298                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1299                     if (node->volgroup.children[idx]) {
1300                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1301                     }
1302                 }
1303             }
1304         }
1305     }
1306     VOL_UNLOCK;
1307
1308     /*
1309      * if necessary, notify fileserver of
1310      * failure to salvage volume group
1311      * [we cannot guarantee that the child made the
1312      *  appropriate notifications (e.g. SIGSEGV)]
1313      *  -- tkeiser 11/28/2007
1314      */
1315     if (ChildFailed(status)) {
1316         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1317             if (volids[idx]) {
1318                 FSYNC_VolOp(volids[idx],
1319                             partName,
1320                             FSYNC_VOL_FORCE_ERROR,
1321                             FSYNC_WHATEVER,
1322                             NULL);
1323             }
1324         }
1325     }
1326 }
1327
1328 #endif /* AFS_DEMAND_ATTACH_FS */