salvager: do not redefine SalvageVolumeGroup
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29 #include <afs/procmgmt.h>
30 #include <roken.h>
31
32 #include <stddef.h>
33
34 #ifdef AFS_NT40_ENV
35 #include <winsock2.h>
36 #endif
37
38 #include <afs/afs_assert.h>
39 #include <rx/xdr.h>
40 #include <afs/afsint.h>
41 #include "nfs.h"
42 #include <afs/errors.h>
43 #include "salvsync.h"
44 #include "lwp.h"
45 #include "lock.h"
46 #include <afs/afssyscalls.h>
47 #include "ihandle.h"
48 #include "vnode.h"
49 #include "volume.h"
50 #include "partition.h"
51 #include "common.h"
52 #include <rx/rx_queue.h>
53
54 #ifdef USE_UNIX_SOCKETS
55 #include <afs/afsutil.h>
56 #include <sys/un.h>
57 #endif
58
59 #ifndef WCOREDUMP
60 #define WCOREDUMP(x)    ((x) & 0200)
61 #endif
62
63 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
64                                  * move = dump+restore can run on single server */
65
66
67 /*
68  * This lock controls access to the handler array.
69  */
70 struct Lock SALVSYNC_handler_lock;
71
72
73 #ifdef AFS_DEMAND_ATTACH_FS
74 /*
75  * SALVSYNC is a feature specific to the demand attach fileserver
76  */
77
78 /* Forward declarations */
79 static void * SALVSYNC_syncThread(void *);
80 static void SALVSYNC_newconnection(osi_socket fd);
81 static void SALVSYNC_com(osi_socket fd);
82 static void SALVSYNC_Drop(osi_socket fd);
83 static void AcceptOn(void);
84 static void AcceptOff(void);
85 static void InitHandler(void);
86 static void CallHandler(fd_set * fdsetp);
87 static int AddHandler(osi_socket afd, void (*aproc) (int));
88 static int FindHandler(osi_socket afd);
89 static int FindHandler_r(osi_socket afd);
90 static int RemoveHandler(osi_socket afd);
91 static void GetHandler(fd_set * fdsetp, int *maxfdp);
92
93 static int AllocNode(struct SalvageQueueNode ** node);
94
95 static int AddToSalvageQueue(struct SalvageQueueNode * node);
96 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
97 static void AddToPendingQueue(struct SalvageQueueNode * node);
98 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
99 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
100 static void UpdateCommandPrio(struct SalvageQueueNode * node);
101 static void HandlePrio(struct SalvageQueueNode * clone,
102                        struct SalvageQueueNode * parent,
103                        afs_uint32 new_prio);
104
105 static int LinkNode(struct SalvageQueueNode * parent,
106                     struct SalvageQueueNode * clone);
107
108 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
109                                             struct SalvageQueueNode ** parent);
110 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
111                                                      struct SalvageQueueNode ** parent);
112 static void AddNodeToHash(struct SalvageQueueNode * node);
113
114 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
115 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
116 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
117 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
118 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
119
120
121 extern int LogLevel;
122 extern int VInit;
123 extern pthread_mutex_t vol_salvsync_mutex;
124
125 /**
126  * salvsync server socket handle.
127  */
128 static SYNC_server_state_t salvsync_server_state =
129     { OSI_NULLSOCKET,                       /* file descriptor */
130       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
131       SALVSYNC_PROTO_VERSION,   /* protocol version */
132       5,                        /* bind() retry limit */
133       100,                      /* listen() queue depth */
134       "SALVSYNC",               /* protocol name string */
135     };
136
137
138 /**
139  * queue of all volumes waiting to be salvaged.
140  */
141 struct SalvageQueue {
142     volatile int total_len;
143     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
144     volatile int len[VOLMAXPARTS+1];
145     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
146     pthread_cond_t cv;
147 };
148 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
149
150 /**
151  * queue of all volumes currently being salvaged.
152  */
153 struct QueueHead {
154     volatile struct rx_queue q;  /**< queue of salvages in progress */
155     volatile int len;            /**< length of in-progress queue */
156     pthread_cond_t queue_change_cv;
157 };
158 static struct QueueHead pendingQueue;  /* volumes being salvaged */
159
160 /* XXX
161  * whether a partition has a salvage in progress
162  *
163  * the salvager code only permits one salvage per partition at a time
164  *
165  * the following hack tries to keep salvaged parallelism high by
166  * only permitting one salvage dispatch per partition at a time
167  *
168  * unfortunately, the parallel salvager currently
169  * has a rather braindead routine that won't permit
170  * multiple salvages on the same "device".  this
171  * function happens to break pretty badly on lvm, raid luns, etc.
172  *
173  * this hack isn't good enough to stop the device limiting code from
174  * crippling performance.  someday that code needs to be rewritten
175  */
176 static int partition_salvaging[VOLMAXPARTS+1];
177
178 static int HandlerFD[MAXHANDLERS];
179 static void (*HandlerProc[MAXHANDLERS]) (int);
180
181 #define VSHASH_SIZE 64
182 #define VSHASH_MASK (VSHASH_SIZE-1)
183 #define VSHASH(vid) ((vid)&VSHASH_MASK)
184
185 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
186
187 static struct SalvageQueueNode *
188 LookupNode(afs_uint32 vid, char * partName,
189            struct SalvageQueueNode ** parent)
190 {
191     struct rx_queue *qp, *nqp;
192     struct SalvageQueueNode *vsp;
193     int idx = VSHASH(vid);
194
195     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
196         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
197         if ((vsp->command.sop.volume == vid) &&
198             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
199             break;
200         }
201     }
202
203     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
204         vsp = NULL;
205     }
206
207     if (parent) {
208         if (vsp) {
209             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
210                 vsp->volgroup.parent : vsp;
211         } else {
212             *parent = NULL;
213         }
214     }
215
216     return vsp;
217 }
218
219 static struct SalvageQueueNode *
220 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
221                     struct SalvageQueueNode ** parent)
222 {
223     return LookupNode(qry->volume, qry->partName, parent);
224 }
225
226 static void
227 AddNodeToHash(struct SalvageQueueNode * node)
228 {
229     int idx = VSHASH(node->command.sop.volume);
230
231     if (queue_IsOnQueue(&node->hash_chain)) {
232         return;
233     }
234
235     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
236     SalvageHashTable[idx].len++;
237 }
238
239 #if 0
240 static void
241 DeleteNodeFromHash(struct SalvageQueueNode * node)
242 {
243     int idx = VSHASH(node->command.sop.volume);
244
245     if (queue_IsNotOnQueue(&node->hash_chain)) {
246         return;
247     }
248
249     queue_Remove(&node->hash_chain);
250     SalvageHashTable[idx].len--;
251 }
252 #endif
253
254 void
255 SALVSYNC_salvInit(void)
256 {
257     int i;
258     pthread_t tid;
259     pthread_attr_t tattr;
260
261     /* initialize the queues */
262     Lock_Init(&SALVSYNC_handler_lock);
263     CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
264     for (i = 0; i <= VOLMAXPARTS; i++) {
265         queue_Init(&salvageQueue.part[i]);
266         salvageQueue.len[i] = 0;
267     }
268     CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
269     queue_Init(&pendingQueue);
270     salvageQueue.total_len = pendingQueue.len = 0;
271     salvageQueue.last_insert = -1;
272     memset(partition_salvaging, 0, sizeof(partition_salvaging));
273
274     for (i = 0; i < VSHASH_SIZE; i++) {
275         CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
276         SalvageHashTable[i].len = 0;
277         queue_Init(&SalvageHashTable[i]);
278     }
279
280     /* start the salvsync thread */
281     osi_Assert(pthread_attr_init(&tattr) == 0);
282     osi_Assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
283     osi_Assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
284 }
285
286 static void
287 CleanFDs(void)
288 {
289     int i;
290     for (i = 0; i < MAXHANDLERS; ++i) {
291         if (HandlerFD[i] >= 0) {
292             SALVSYNC_Drop(HandlerFD[i]);
293         }
294     }
295
296     /* just in case we were in AcceptOff mode, and thus this fd wouldn't
297      * have a handler */
298     close(salvsync_server_state.fd);
299     salvsync_server_state.fd = OSI_NULLSOCKET;
300 }
301
302 static fd_set SALVSYNC_readfds;
303
304 static void *
305 SALVSYNC_syncThread(void * args)
306 {
307     int code;
308     SYNC_server_state_t * state = &salvsync_server_state;
309
310     /* when we fork, the child needs to close the salvsync server sockets,
311      * otherwise, it may get salvsync requests, instead of the parent
312      * salvageserver */
313     osi_Assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
314
315     SYNC_getAddr(&state->endpoint, &state->addr);
316     SYNC_cleanupSock(state);
317
318 #ifndef AFS_NT40_ENV
319     (void)signal(SIGPIPE, SIG_IGN);
320 #endif
321
322     state->fd = SYNC_getSock(&state->endpoint);
323     code = SYNC_bindSock(state);
324     osi_Assert(!code);
325
326     InitHandler();
327     AcceptOn();
328
329     for (;;) {
330         int maxfd;
331         struct timeval s_timeout;
332         GetHandler(&SALVSYNC_readfds, &maxfd);
333         s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
334         s_timeout.tv_usec = 0;
335         /* Note: check for >= 1 below is essential since IOMGR_select
336          * doesn't have exactly same semantics as select.
337          */
338         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
339             CallHandler(&SALVSYNC_readfds);
340     }
341
342     return NULL;
343 }
344
345 static void
346 SALVSYNC_newconnection(int afd)
347 {
348 #ifdef USE_UNIX_SOCKETS
349     struct sockaddr_un other;
350 #else  /* USE_UNIX_SOCKETS */
351     struct sockaddr_in other;
352 #endif
353     int fd;
354     socklen_t junk;
355
356     junk = sizeof(other);
357     fd = accept(afd, (struct sockaddr *)&other, &junk);
358     if (fd == OSI_NULLSOCKET) {
359         osi_Panic("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
360     } else if (!AddHandler(fd, SALVSYNC_com)) {
361         AcceptOff();
362         osi_Assert(AddHandler(fd, SALVSYNC_com));
363     }
364 }
365
366 /* this function processes commands from an salvsync file descriptor (fd) */
367 static afs_int32 SALV_cnt = 0;
368 static void
369 SALVSYNC_com(osi_socket fd)
370 {
371     SYNC_command com;
372     SYNC_response res;
373     SALVSYNC_response_hdr sres_hdr;
374     SALVSYNC_command scom;
375     SALVSYNC_response sres;
376     SYNC_PROTO_BUF_DECL(buf);
377
378     memset(&com, 0, sizeof(com));
379     memset(&res, 0, sizeof(res));
380     memset(&scom, 0, sizeof(scom));
381     memset(&sres, 0, sizeof(sres));
382     memset(&sres_hdr, 0, sizeof(sres_hdr));
383
384     com.payload.buf = (void *)buf;
385     com.payload.len = SYNC_PROTO_MAX_LEN;
386     res.payload.buf = (void *) &sres_hdr;
387     res.payload.len = sizeof(sres_hdr);
388     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
389     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
390
391     scom.hdr = &com.hdr;
392     scom.sop = (SALVSYNC_command_hdr *) buf;
393     scom.com = &com;
394     sres.hdr = &res.hdr;
395     sres.sop = &sres_hdr;
396     sres.res = &res;
397
398     SALV_cnt++;
399     if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
400         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
401         SALVSYNC_Drop(fd);
402         return;
403     }
404
405     if (com.recv_len < sizeof(com.hdr)) {
406         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
407         res.hdr.response = SYNC_COM_ERROR;
408         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
409         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
410         goto respond;
411     }
412
413     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
414         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
415         res.hdr.response = SYNC_COM_ERROR;
416         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
417         goto respond;
418     }
419
420     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
421         res.hdr.response = SYNC_OK;
422         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
423
424         /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
425          * never wait for a response. */
426         goto done;
427     }
428
429     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
430         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
431         res.hdr.response = SYNC_COM_ERROR;
432         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
433         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
434         goto respond;
435     }
436
437     res.hdr.com_seq = com.hdr.com_seq;
438
439     VOL_LOCK;
440     switch (com.hdr.command) {
441     case SALVSYNC_NOP:
442         break;
443     case SALVSYNC_SALVAGE:
444     case SALVSYNC_RAISEPRIO:
445         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
446         break;
447     case SALVSYNC_CANCEL:
448         /* cancel a salvage */
449         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
450         break;
451     case SALVSYNC_CANCELALL:
452         /* cancel all queued salvages */
453         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
454         break;
455     case SALVSYNC_QUERY:
456         /* query whether a volume is done salvaging */
457         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
458         break;
459     case SALVSYNC_OP_LINK:
460         /* link a clone to its parent in the scheduler */
461         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
462         break;
463     default:
464         res.hdr.response = SYNC_BAD_COMMAND;
465         break;
466     }
467
468     sres_hdr.sq_len = salvageQueue.total_len;
469     sres_hdr.pq_len = pendingQueue.len;
470     VOL_UNLOCK;
471
472  respond:
473     SYNC_putRes(&salvsync_server_state, fd, &res);
474
475  done:
476     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
477         SALVSYNC_Drop(fd);
478     }
479 }
480
481 /**
482  * request that a volume be salvaged.
483  *
484  * @param[in]  com  inbound command object
485  * @param[out] res  outbound response object
486  *
487  * @return operation status
488  *    @retval SYNC_OK success
489  *    @retval SYNC_DENIED failed to enqueue request
490  *    @retval SYNC_FAILED malformed command packet
491  *
492  * @note this is a SALVSYNC protocol rpc handler
493  *
494  * @internal
495  *
496  * @post the volume is enqueued in the to-be-salvaged queue.
497  *       if the volume was already in the salvage queue, its
498  *       priority (and thus its location in the queue) are
499  *       updated.
500  */
501 static afs_int32
502 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
503 {
504     afs_int32 code = SYNC_OK;
505     struct SalvageQueueNode * node, * clone;
506     int hash = 0;
507
508     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
509         code = SYNC_FAILED;
510         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
511         goto done;
512     }
513
514     clone = LookupNodeByCommand(com->sop, &node);
515
516     if (node == NULL) {
517         if (AllocNode(&node)) {
518             code = SYNC_DENIED;
519             res->hdr->reason = SYNC_REASON_NOMEM;
520             goto done;
521         }
522         clone = node;
523         hash = 1;
524     }
525
526     HandlePrio(clone, node, com->sop->prio);
527
528     switch (node->state) {
529     case SALVSYNC_STATE_QUEUED:
530         UpdateCommandPrio(node);
531         break;
532
533     case SALVSYNC_STATE_ERROR:
534     case SALVSYNC_STATE_DONE:
535     case SALVSYNC_STATE_UNKNOWN:
536         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
537         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
538
539         /*
540          * make sure volgroup parent partition path is kept coherent
541          *
542          * If we ever want to support non-COW clones on a machine holding
543          * the RW site, please note that this code does not work under the
544          * conditions where someone zaps a COW clone on partition X, and
545          * subsequently creates a full clone on partition Y -- we'd need
546          * an inverse to SALVSYNC_com_Link.
547          *  -- tkeiser 11/28/2007
548          */
549         strcpy(node->command.sop.partName, com->sop->partName);
550
551         if (AddToSalvageQueue(node)) {
552             code = SYNC_DENIED;
553         }
554         break;
555
556     default:
557         break;
558     }
559
560     if (hash) {
561         AddNodeToHash(node);
562     }
563
564     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
565     res->sop->state = node->state;
566     res->sop->prio = node->command.sop.prio;
567
568  done:
569     return code;
570 }
571
572 /**
573  * cancel a pending salvage request.
574  *
575  * @param[in]  com  inbound command object
576  * @param[out] res  outbound response object
577  *
578  * @return operation status
579  *    @retval SYNC_OK success
580  *    @retval SYNC_FAILED malformed command packet
581  *
582  * @note this is a SALVSYNC protocol rpc handler
583  *
584  * @internal
585  */
586 static afs_int32
587 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
588 {
589     afs_int32 code = SYNC_OK;
590     struct SalvageQueueNode * node;
591
592     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
593         code = SYNC_FAILED;
594         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
595         goto done;
596     }
597
598     node = LookupNodeByCommand(com->sop, NULL);
599
600     if (node == NULL) {
601         res->sop->state = SALVSYNC_STATE_UNKNOWN;
602         res->sop->prio = 0;
603     } else {
604         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
605         res->sop->prio = node->command.sop.prio;
606         res->sop->state = node->state;
607         if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
608             (node->state == SALVSYNC_STATE_QUEUED)) {
609             DeleteFromSalvageQueue(node);
610         }
611     }
612
613  done:
614     return code;
615 }
616
617 /**
618  * cancel all pending salvage requests.
619  *
620  * @param[in]  com  incoming command object
621  * @param[out] res  outbound response object
622  *
623  * @return operation status
624  *    @retval SYNC_OK success
625  *
626  * @note this is a SALVSYNC protocol rpc handler
627  *
628  * @internal
629  */
630 static afs_int32
631 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
632 {
633     struct SalvageQueueNode * np, *nnp;
634     struct DiskPartition64 * dp;
635
636     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
637         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
638             DeleteFromSalvageQueue(np);
639         }
640     }
641
642     return SYNC_OK;
643 }
644
645 /**
646  * link a queue node for a clone to its parent volume.
647  *
648  * @param[in]  com   inbound command object
649  * @param[out] res   outbound response object
650  *
651  * @return operation status
652  *    @retval SYNC_OK success
653  *    @retval SYNC_FAILED malformed command packet
654  *    @retval SYNC_DENIED the request could not be completed
655  *
656  * @note this is a SALVSYNC protocol rpc handler
657  *
658  * @post the requested volume is marked as a child of another volume.
659  *       thus, future salvage requests for this volume will result in the
660  *       parent of the volume group being scheduled for salvage instead
661  *       of this clone.
662  *
663  * @internal
664  */
665 static afs_int32
666 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
667 {
668     afs_int32 code = SYNC_OK;
669     struct SalvageQueueNode * clone, * parent;
670
671     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
672         code = SYNC_FAILED;
673         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
674         goto done;
675     }
676
677     /* lookup clone's salvage scheduling node */
678     clone = LookupNodeByCommand(com->sop, NULL);
679     if (clone == NULL) {
680         code = SYNC_DENIED;
681         res->hdr->reason = SALVSYNC_REASON_ERROR;
682         goto done;
683     }
684
685     /* lookup parent's salvage scheduling node */
686     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
687     if (parent == NULL) {
688         if (AllocNode(&parent)) {
689             code = SYNC_DENIED;
690             res->hdr->reason = SYNC_REASON_NOMEM;
691             goto done;
692         }
693         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
694         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
695         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
696         AddNodeToHash(parent);
697     }
698
699     if (LinkNode(parent, clone)) {
700         code = SYNC_DENIED;
701         goto done;
702     }
703
704  done:
705     return code;
706 }
707
708 /**
709  * query the status of a volume salvage request.
710  *
711  * @param[in]  com   inbound command object
712  * @param[out] res   outbound response object
713  *
714  * @return operation status
715  *    @retval SYNC_OK success
716  *    @retval SYNC_FAILED malformed command packet
717  *
718  * @note this is a SALVSYNC protocol rpc handler
719  *
720  * @internal
721  */
722 static afs_int32
723 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
724 {
725     afs_int32 code = SYNC_OK;
726     struct SalvageQueueNode * node;
727
728     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
729         code = SYNC_FAILED;
730         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
731         goto done;
732     }
733
734     LookupNodeByCommand(com->sop, &node);
735
736     /* query whether a volume is done salvaging */
737     if (node == NULL) {
738         res->sop->state = SALVSYNC_STATE_UNKNOWN;
739         res->sop->prio = 0;
740     } else {
741         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
742         res->sop->state = node->state;
743         res->sop->prio = node->command.sop.prio;
744     }
745
746  done:
747     return code;
748 }
749
750 static void
751 SALVSYNC_Drop(osi_socket fd)
752 {
753     RemoveHandler(fd);
754     rk_closesocket(fd);
755     AcceptOn();
756 }
757
758 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
759
760 static void
761 AcceptOn(void)
762 {
763     if (AcceptHandler == -1) {
764         osi_Assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
765         AcceptHandler = FindHandler(salvsync_server_state.fd);
766     }
767 }
768
769 static void
770 AcceptOff(void)
771 {
772     if (AcceptHandler != -1) {
773         osi_Assert(RemoveHandler(salvsync_server_state.fd));
774         AcceptHandler = -1;
775     }
776 }
777
778 /* The multiple FD handling code. */
779
780 static void
781 InitHandler(void)
782 {
783     int i;
784     ObtainWriteLock(&SALVSYNC_handler_lock);
785     for (i = 0; i < MAXHANDLERS; i++) {
786         HandlerFD[i] = OSI_NULLSOCKET;
787         HandlerProc[i] = NULL;
788     }
789     ReleaseWriteLock(&SALVSYNC_handler_lock);
790 }
791
792 static void
793 CallHandler(fd_set * fdsetp)
794 {
795     int i;
796     ObtainReadLock(&SALVSYNC_handler_lock);
797     for (i = 0; i < MAXHANDLERS; i++) {
798         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
799             ReleaseReadLock(&SALVSYNC_handler_lock);
800             (*HandlerProc[i]) (HandlerFD[i]);
801             ObtainReadLock(&SALVSYNC_handler_lock);
802         }
803     }
804     ReleaseReadLock(&SALVSYNC_handler_lock);
805 }
806
807 static int
808 AddHandler(osi_socket afd, void (*aproc) (int))
809 {
810     int i;
811     ObtainWriteLock(&SALVSYNC_handler_lock);
812     for (i = 0; i < MAXHANDLERS; i++)
813         if (HandlerFD[i] == OSI_NULLSOCKET)
814             break;
815     if (i >= MAXHANDLERS) {
816         ReleaseWriteLock(&SALVSYNC_handler_lock);
817         return 0;
818     }
819     HandlerFD[i] = afd;
820     HandlerProc[i] = aproc;
821     ReleaseWriteLock(&SALVSYNC_handler_lock);
822     return 1;
823 }
824
825 static int
826 FindHandler(osi_socket afd)
827 {
828     int i;
829     ObtainReadLock(&SALVSYNC_handler_lock);
830     for (i = 0; i < MAXHANDLERS; i++)
831         if (HandlerFD[i] == afd) {
832             ReleaseReadLock(&SALVSYNC_handler_lock);
833             return i;
834         }
835     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
836     osi_Panic("Failed to find handler\n");
837     return -1;                  /* satisfy compiler */
838 }
839
840 static int
841 FindHandler_r(osi_socket afd)
842 {
843     int i;
844     for (i = 0; i < MAXHANDLERS; i++)
845         if (HandlerFD[i] == afd) {
846             return i;
847         }
848     osi_Panic("Failed to find handler\n");
849     return -1;                  /* satisfy compiler */
850 }
851
852 static int
853 RemoveHandler(osi_socket afd)
854 {
855     ObtainWriteLock(&SALVSYNC_handler_lock);
856     HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
857     ReleaseWriteLock(&SALVSYNC_handler_lock);
858     return 1;
859 }
860
861 static void
862 GetHandler(fd_set * fdsetp, int *maxfdp)
863 {
864     int i;
865     int maxfd = -1;
866     FD_ZERO(fdsetp);
867     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
868     for (i = 0; i < MAXHANDLERS; i++)
869         if (HandlerFD[i] != OSI_NULLSOCKET) {
870             FD_SET(HandlerFD[i], fdsetp);
871 #ifndef AFS_NT40_ENV
872             /* On Windows the nfds parameter to select() is ignored */
873             if (maxfd < HandlerFD[i] || maxfd == (int)-1)
874                 maxfd = HandlerFD[i];
875 #endif
876         }
877     *maxfdp = maxfd;
878     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
879 }
880
881 /**
882  * allocate a salvage queue node.
883  *
884  * @param[out] node_out  address in which to store new node pointer
885  *
886  * @return operation status
887  *    @retval 0 success
888  *    @retval 1 failed to allocate node
889  *
890  * @internal
891  */
892 static int
893 AllocNode(struct SalvageQueueNode ** node_out)
894 {
895     int code = 0;
896     struct SalvageQueueNode * node;
897
898     *node_out = node = (struct SalvageQueueNode *)
899         malloc(sizeof(struct SalvageQueueNode));
900     if (node == NULL) {
901         code = 1;
902         goto done;
903     }
904
905     memset(node, 0, sizeof(struct SalvageQueueNode));
906     node->type = SALVSYNC_VOLGROUP_PARENT;
907     node->state = SALVSYNC_STATE_UNKNOWN;
908
909  done:
910     return code;
911 }
912
913 /**
914  * link a salvage queue node to its parent.
915  *
916  * @param[in] parent  pointer to queue node for parent of volume group
917  * @param[in] clone   pointer to queue node for a clone
918  *
919  * @return operation status
920  *    @retval 0 success
921  *    @retval 1 failure
922  *
923  * @internal
924  */
925 static int
926 LinkNode(struct SalvageQueueNode * parent,
927          struct SalvageQueueNode * clone)
928 {
929     int code = 0;
930     int idx;
931
932     /* check for attaching a clone to a clone */
933     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
934         code = 1;
935         goto done;
936     }
937
938     /* check for pre-existing registration and openings */
939     for (idx = 0; idx < VOLMAXTYPES; idx++) {
940         if (parent->volgroup.children[idx] == clone) {
941             goto linked;
942         }
943         if (parent->volgroup.children[idx] == NULL) {
944             break;
945         }
946     }
947     if (idx == VOLMAXTYPES) {
948         code = 1;
949         goto done;
950     }
951
952     /* link parent and child */
953     parent->volgroup.children[idx] = clone;
954     clone->type = SALVSYNC_VOLGROUP_CLONE;
955     clone->volgroup.parent = parent;
956
957
958  linked:
959     switch (clone->state) {
960     case SALVSYNC_STATE_QUEUED:
961         DeleteFromSalvageQueue(clone);
962
963     case SALVSYNC_STATE_SALVAGING:
964         switch (parent->state) {
965         case SALVSYNC_STATE_UNKNOWN:
966         case SALVSYNC_STATE_ERROR:
967         case SALVSYNC_STATE_DONE:
968             parent->command.sop.prio = clone->command.sop.prio;
969             AddToSalvageQueue(parent);
970             break;
971
972         case SALVSYNC_STATE_QUEUED:
973             if (clone->command.sop.prio) {
974                 parent->command.sop.prio += clone->command.sop.prio;
975                 UpdateCommandPrio(parent);
976             }
977             break;
978
979         default:
980             break;
981         }
982         break;
983
984     default:
985         break;
986     }
987
988  done:
989     return code;
990 }
991
992 static void
993 HandlePrio(struct SalvageQueueNode * clone,
994            struct SalvageQueueNode * node,
995            afs_uint32 new_prio)
996 {
997     afs_uint32 delta;
998
999     switch (node->state) {
1000     case SALVSYNC_STATE_ERROR:
1001     case SALVSYNC_STATE_DONE:
1002     case SALVSYNC_STATE_UNKNOWN:
1003         node->command.sop.prio = 0;
1004         break;
1005     default:
1006         break;
1007     }
1008
1009     if (new_prio < clone->command.sop.prio) {
1010         /* strange. let's just set our delta to 1 */
1011         delta = 1;
1012     } else {
1013         delta = new_prio - clone->command.sop.prio;
1014     }
1015
1016     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1017         clone->command.sop.prio = new_prio;
1018     }
1019
1020     node->command.sop.prio += delta;
1021 }
1022
1023 static int
1024 AddToSalvageQueue(struct SalvageQueueNode * node)
1025 {
1026     afs_int32 id;
1027     struct SalvageQueueNode * last = NULL;
1028
1029     id = volutil_GetPartitionID(node->command.sop.partName);
1030     if (id < 0 || id > VOLMAXPARTS) {
1031         return 1;
1032     }
1033     if (!VGetPartitionById_r(id, 0)) {
1034         /* don't enqueue salvage requests for unmounted partitions */
1035         return 1;
1036     }
1037     if (queue_IsOnQueue(node)) {
1038         return 0;
1039     }
1040
1041     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1042         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1043     }
1044     queue_Append(&salvageQueue.part[id], node);
1045     salvageQueue.len[id]++;
1046     salvageQueue.total_len++;
1047     salvageQueue.last_insert = id;
1048     node->partition_id = id;
1049     node->state = SALVSYNC_STATE_QUEUED;
1050
1051     /* reorder, if necessary */
1052     if (last && last->command.sop.prio < node->command.sop.prio) {
1053         UpdateCommandPrio(node);
1054     }
1055
1056     CV_BROADCAST(&salvageQueue.cv);
1057     return 0;
1058 }
1059
1060 static void
1061 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1062 {
1063     if (queue_IsOnQueue(node)) {
1064         queue_Remove(node);
1065         salvageQueue.len[node->partition_id]--;
1066         salvageQueue.total_len--;
1067         node->state = SALVSYNC_STATE_UNKNOWN;
1068         CV_BROADCAST(&salvageQueue.cv);
1069     }
1070 }
1071
1072 static void
1073 AddToPendingQueue(struct SalvageQueueNode * node)
1074 {
1075     queue_Append(&pendingQueue, node);
1076     pendingQueue.len++;
1077     node->state = SALVSYNC_STATE_SALVAGING;
1078     CV_BROADCAST(&pendingQueue.queue_change_cv);
1079 }
1080
1081 static void
1082 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1083 {
1084     if (queue_IsOnQueue(node)) {
1085         queue_Remove(node);
1086         pendingQueue.len--;
1087         node->state = SALVSYNC_STATE_UNKNOWN;
1088         CV_BROADCAST(&pendingQueue.queue_change_cv);
1089     }
1090 }
1091
1092 #if 0
1093 static struct SalvageQueueNode *
1094 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1095 {
1096     struct SalvageQueueNode * np, * nnp;
1097
1098     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1099         if ((np->command.sop.volume == qry->volume) &&
1100             !strncmp(np->command.sop.partName, qry->partName,
1101                      sizeof(qry->partName)))
1102             break;
1103     }
1104
1105     if (queue_IsEnd(&pendingQueue, np))
1106         np = NULL;
1107     return np;
1108 }
1109 #endif
1110
1111 static struct SalvageQueueNode *
1112 LookupPendingCommandByPid(int pid)
1113 {
1114     struct SalvageQueueNode * np, * nnp;
1115
1116     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1117         if (np->pid == pid)
1118             break;
1119     }
1120
1121     if (queue_IsEnd(&pendingQueue, np))
1122         np = NULL;
1123     return np;
1124 }
1125
1126
1127 /* raise the priority of a previously scheduled salvage */
1128 static void
1129 UpdateCommandPrio(struct SalvageQueueNode * node)
1130 {
1131     struct SalvageQueueNode *np, *nnp;
1132     afs_int32 id;
1133     afs_uint32 prio;
1134
1135     osi_Assert(queue_IsOnQueue(node));
1136
1137     prio = node->command.sop.prio;
1138     id = node->partition_id;
1139     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1140         queue_Remove(node);
1141         queue_Prepend(&salvageQueue.part[id], node);
1142     } else {
1143         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1144             if (np->command.sop.prio > prio)
1145                 break;
1146         }
1147         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1148             queue_Remove(node);
1149             queue_Prepend(&salvageQueue.part[id], node);
1150         } else if (node != np) {
1151             queue_Remove(node);
1152             queue_InsertAfter(np, node);
1153         }
1154     }
1155 }
1156
1157 /* this will need to be rearchitected if we ever want more than one thread
1158  * to wait for new salvage nodes */
1159 struct SalvageQueueNode *
1160 SALVSYNC_getWork(void)
1161 {
1162     int i;
1163     struct DiskPartition64 * dp = NULL, * fdp;
1164     static afs_int32 next_part_sched = 0;
1165     struct SalvageQueueNode *node = NULL;
1166
1167     VOL_LOCK;
1168
1169     /*
1170      * wait for work to be scheduled
1171      * if there are no disk partitions, just sit in this wait loop forever
1172      */
1173     while (!salvageQueue.total_len || !DiskPartitionList) {
1174         VOL_CV_WAIT(&salvageQueue.cv);
1175     }
1176
1177     /*
1178      * short circuit for simple case where only one partition has
1179      * scheduled salvages
1180      */
1181     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1182         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1183         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1184         goto have_node;
1185     }
1186
1187
1188     /*
1189      * ok, more than one partition has scheduled salvages.
1190      * now search for partitions with scheduled salvages, but no pending salvages.
1191      */
1192     dp = VGetPartitionById_r(next_part_sched, 0);
1193     if (!dp) {
1194         dp = DiskPartitionList;
1195     }
1196     fdp = dp;
1197
1198     for (i=0 ;
1199          !i || dp != fdp ;
1200          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1201         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1202             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1203             goto have_node;
1204         }
1205     }
1206
1207
1208     /*
1209      * all partitions with scheduled salvages have at least one pending.
1210      * now do an exhaustive search for a scheduled salvage.
1211      */
1212     dp = fdp;
1213
1214     for (i=0 ;
1215          !i || dp != fdp ;
1216          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1217         if (salvageQueue.len[dp->index]) {
1218             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1219             goto have_node;
1220         }
1221     }
1222
1223     /* we should never reach this line */
1224     osi_Panic("Node not found\n");
1225
1226  have_node:
1227     osi_Assert(node != NULL);
1228     node->pid = 0;
1229     partition_salvaging[node->partition_id]++;
1230     DeleteFromSalvageQueue(node);
1231     AddToPendingQueue(node);
1232
1233     if (dp) {
1234         /* update next_part_sched field */
1235         if (dp->next) {
1236             next_part_sched = dp->next->index;
1237         } else if (DiskPartitionList) {
1238             next_part_sched = DiskPartitionList->index;
1239         } else {
1240             next_part_sched = -1;
1241         }
1242     }
1243
1244     VOL_UNLOCK;
1245     return node;
1246 }
1247
1248 /**
1249  * update internal scheduler state to reflect completion of a work unit.
1250  *
1251  * @param[in]  node    salvage queue node object pointer
1252  * @param[in]  result  worker process result code
1253  *
1254  * @post scheduler state is updated.
1255  *
1256  * @internal
1257  */
1258 static void
1259 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1260 {
1261     afs_int32 partid;
1262     int idx;
1263
1264     DeleteFromPendingQueue(node);
1265     partid = node->partition_id;
1266     if (partid >=0 && partid <= VOLMAXPARTS) {
1267         partition_salvaging[partid]--;
1268     }
1269     if (result == 0) {
1270         node->state = SALVSYNC_STATE_DONE;
1271     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1272         node->state = SALVSYNC_STATE_ERROR;
1273     }
1274
1275     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1276         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1277             if (node->volgroup.children[idx]) {
1278                 node->volgroup.children[idx]->state = node->state;
1279             }
1280         }
1281     }
1282 }
1283
1284 /**
1285  * check whether worker child failed.
1286  *
1287  * @param[in] status  status bitfield return by wait()
1288  *
1289  * @return boolean failure code
1290  *    @retval 0 child succeeded
1291  *    @retval 1 child failed
1292  *
1293  * @internal
1294  */
1295 static int
1296 ChildFailed(int status)
1297 {
1298     return (WCOREDUMP(status) ||
1299             WIFSIGNALED(status) ||
1300             ((WEXITSTATUS(status) != 0) &&
1301              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1302 }
1303
1304
1305 /**
1306  * notify salvsync scheduler of node completion, by child pid.
1307  *
1308  * @param[in]  pid     pid of worker child
1309  * @param[in]  status  worker status bitfield from wait()
1310  *
1311  * @post scheduler state is updated.
1312  *       if status code is a failure, fileserver notification was attempted
1313  *
1314  * @see SALVSYNC_doneWork_r
1315  */
1316 void
1317 SALVSYNC_doneWorkByPid(int pid, int status)
1318 {
1319     struct SalvageQueueNode * node;
1320     char partName[16];
1321     afs_uint32 volids[VOLMAXTYPES+1];
1322     unsigned int idx;
1323
1324     memset(volids, 0, sizeof(volids));
1325
1326     VOL_LOCK;
1327     node = LookupPendingCommandByPid(pid);
1328     if (node != NULL) {
1329         SALVSYNC_doneWork_r(node, status);
1330
1331         if (ChildFailed(status)) {
1332             /* populate volume id list for later processing outside the glock */
1333             volids[0] = node->command.sop.volume;
1334             strcpy(partName, node->command.sop.partName);
1335             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1336                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1337                     if (node->volgroup.children[idx]) {
1338                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1339                     }
1340                 }
1341             }
1342         }
1343     }
1344     VOL_UNLOCK;
1345
1346     /*
1347      * if necessary, notify fileserver of
1348      * failure to salvage volume group
1349      * [we cannot guarantee that the child made the
1350      *  appropriate notifications (e.g. SIGSEGV)]
1351      *  -- tkeiser 11/28/2007
1352      */
1353     if (ChildFailed(status)) {
1354         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1355             if (volids[idx]) {
1356                 FSYNC_VolOp(volids[idx],
1357                             partName,
1358                             FSYNC_VOL_FORCE_ERROR,
1359                             FSYNC_WHATEVER,
1360                             NULL);
1361             }
1362         }
1363     }
1364 }
1365
1366 #endif /* AFS_DEMAND_ATTACH_FS */