b399d8164940f00f472315bcc1278f52bc0b54d9
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29 #include <afs/procmgmt.h>
30 #include <roken.h>
31
32 #include <stddef.h>
33
34 #include <afs/opr.h>
35 #include <opr/lock.h>
36 #include <afs/afsint.h>
37 #include <rx/rx_queue.h>
38
39 #include "nfs.h"
40 #include <afs/errors.h>
41 #include "salvsync.h"
42 #include "lock.h"
43 #include <afs/afssyscalls.h>
44 #include "ihandle.h"
45 #include "vnode.h"
46 #include "volume.h"
47 #include "partition.h"
48 #include "common.h"
49 #include <rx/rx_queue.h>
50
51 #ifdef USE_UNIX_SOCKETS
52 #include <afs/afsutil.h>
53 #include <sys/un.h>
54 #endif
55
56 #ifndef WCOREDUMP
57 #define WCOREDUMP(x)    ((x) & 0200)
58 #endif
59
60 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
61                                  * move = dump+restore can run on single server */
62
63
64 /*
65  * This lock controls access to the handler array.
66  */
67 struct Lock SALVSYNC_handler_lock;
68
69
70 #ifdef AFS_DEMAND_ATTACH_FS
71 /*
72  * SALVSYNC is a feature specific to the demand attach fileserver
73  */
74
75 /* Forward declarations */
76 static void * SALVSYNC_syncThread(void *);
77 static void SALVSYNC_newconnection(osi_socket fd);
78 static void SALVSYNC_com(osi_socket fd);
79 static void SALVSYNC_Drop(osi_socket fd);
80 static void AcceptOn(void);
81 static void AcceptOff(void);
82 static void InitHandler(void);
83 static void CallHandler(fd_set * fdsetp);
84 static int AddHandler(osi_socket afd, void (*aproc) (int));
85 static int FindHandler(osi_socket afd);
86 static int FindHandler_r(osi_socket afd);
87 static int RemoveHandler(osi_socket afd);
88 static void GetHandler(fd_set * fdsetp, int *maxfdp);
89
90 static int AllocNode(struct SalvageQueueNode ** node);
91
92 static int AddToSalvageQueue(struct SalvageQueueNode * node);
93 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
94 static void AddToPendingQueue(struct SalvageQueueNode * node);
95 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
96 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
97 static void UpdateCommandPrio(struct SalvageQueueNode * node);
98 static void HandlePrio(struct SalvageQueueNode * clone,
99                        struct SalvageQueueNode * parent,
100                        afs_uint32 new_prio);
101
102 static int LinkNode(struct SalvageQueueNode * parent,
103                     struct SalvageQueueNode * clone);
104
105 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
106                                             struct SalvageQueueNode ** parent);
107 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
108                                                      struct SalvageQueueNode ** parent);
109 static void AddNodeToHash(struct SalvageQueueNode * node);
110
111 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
112 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
113 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
114 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
115 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
116
117
118 extern int VInit;
119 extern pthread_mutex_t vol_salvsync_mutex;
120
121 /**
122  * salvsync server socket handle.
123  */
124 static SYNC_server_state_t salvsync_server_state =
125     { OSI_NULLSOCKET,                       /* file descriptor */
126       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
127       SALVSYNC_PROTO_VERSION,   /* protocol version */
128       5,                        /* bind() retry limit */
129       100,                      /* listen() queue depth */
130       "SALVSYNC",               /* protocol name string */
131     };
132
133
134 /**
135  * queue of all volumes waiting to be salvaged.
136  */
137 struct SalvageQueue {
138     volatile int total_len;
139     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
140     volatile int len[VOLMAXPARTS+1];
141     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
142     pthread_cond_t cv;
143 };
144 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
145
146 /**
147  * queue of all volumes currently being salvaged.
148  */
149 struct QueueHead {
150     volatile struct rx_queue q;  /**< queue of salvages in progress */
151     volatile int len;            /**< length of in-progress queue */
152     pthread_cond_t queue_change_cv;
153 };
154 static struct QueueHead pendingQueue;  /* volumes being salvaged */
155
156 /* XXX
157  * whether a partition has a salvage in progress
158  *
159  * the salvager code only permits one salvage per partition at a time
160  *
161  * the following hack tries to keep salvaged parallelism high by
162  * only permitting one salvage dispatch per partition at a time
163  *
164  * unfortunately, the parallel salvager currently
165  * has a rather braindead routine that won't permit
166  * multiple salvages on the same "device".  this
167  * function happens to break pretty badly on lvm, raid luns, etc.
168  *
169  * this hack isn't good enough to stop the device limiting code from
170  * crippling performance.  someday that code needs to be rewritten
171  */
172 static int partition_salvaging[VOLMAXPARTS+1];
173
174 static int HandlerFD[MAXHANDLERS];
175 static void (*HandlerProc[MAXHANDLERS]) (int);
176
177 #define VSHASH_SIZE 64
178 #define VSHASH_MASK (VSHASH_SIZE-1)
179 #define VSHASH(vid) ((vid)&VSHASH_MASK)
180
181 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
182
183 static struct SalvageQueueNode *
184 LookupNode(VolumeId vid, char * partName,
185            struct SalvageQueueNode ** parent)
186 {
187     struct rx_queue *qp, *nqp;
188     struct SalvageQueueNode *vsp = NULL;
189     int idx = VSHASH(vid);
190
191     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
192         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
193         if ((vsp->command.sop.volume == vid) &&
194             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
195             break;
196         }
197     }
198
199     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
200         vsp = NULL;
201     }
202
203     if (parent) {
204         if (vsp) {
205             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
206                 vsp->volgroup.parent : vsp;
207         } else {
208             *parent = NULL;
209         }
210     }
211
212     return vsp;
213 }
214
215 static struct SalvageQueueNode *
216 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
217                     struct SalvageQueueNode ** parent)
218 {
219     return LookupNode(qry->volume, qry->partName, parent);
220 }
221
222 static void
223 AddNodeToHash(struct SalvageQueueNode * node)
224 {
225     int idx = VSHASH(node->command.sop.volume);
226
227     if (queue_IsOnQueue(&node->hash_chain)) {
228         return;
229     }
230
231     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
232     SalvageHashTable[idx].len++;
233 }
234
235 #if 0
236 static void
237 DeleteNodeFromHash(struct SalvageQueueNode * node)
238 {
239     int idx = VSHASH(node->command.sop.volume);
240
241     if (queue_IsNotOnQueue(&node->hash_chain)) {
242         return;
243     }
244
245     queue_Remove(&node->hash_chain);
246     SalvageHashTable[idx].len--;
247 }
248 #endif
249
250 void
251 SALVSYNC_salvInit(void)
252 {
253     int i;
254     pthread_t tid;
255     pthread_attr_t tattr;
256
257     /* initialize the queues */
258     Lock_Init(&SALVSYNC_handler_lock);
259     CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
260     for (i = 0; i <= VOLMAXPARTS; i++) {
261         queue_Init(&salvageQueue.part[i]);
262         salvageQueue.len[i] = 0;
263     }
264     CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
265     queue_Init(&pendingQueue);
266     salvageQueue.total_len = pendingQueue.len = 0;
267     salvageQueue.last_insert = -1;
268     memset(partition_salvaging, 0, sizeof(partition_salvaging));
269
270     for (i = 0; i < VSHASH_SIZE; i++) {
271         CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
272         SalvageHashTable[i].len = 0;
273         queue_Init(&SalvageHashTable[i]);
274     }
275
276     /* start the salvsync thread */
277     opr_Verify(pthread_attr_init(&tattr) == 0);
278     opr_Verify(pthread_attr_setdetachstate(&tattr,
279                                            PTHREAD_CREATE_DETACHED) == 0);
280     opr_Verify(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
281 }
282
283 static void
284 CleanFDs(void)
285 {
286     int i;
287     for (i = 0; i < MAXHANDLERS; ++i) {
288         if (HandlerFD[i] >= 0) {
289             SALVSYNC_Drop(HandlerFD[i]);
290         }
291     }
292
293     /* just in case we were in AcceptOff mode, and thus this fd wouldn't
294      * have a handler */
295     close(salvsync_server_state.fd);
296     salvsync_server_state.fd = OSI_NULLSOCKET;
297 }
298
299 static fd_set SALVSYNC_readfds;
300
301 static void *
302 SALVSYNC_syncThread(void * args)
303 {
304     int code;
305     SYNC_server_state_t * state = &salvsync_server_state;
306
307     /* when we fork, the child needs to close the salvsync server sockets,
308      * otherwise, it may get salvsync requests, instead of the parent
309      * salvageserver */
310     opr_Verify(pthread_atfork(NULL, NULL, CleanFDs) == 0);
311
312     SYNC_getAddr(&state->endpoint, &state->addr);
313     SYNC_cleanupSock(state);
314
315 #ifndef AFS_NT40_ENV
316     (void)signal(SIGPIPE, SIG_IGN);
317 #endif
318
319     state->fd = SYNC_getSock(&state->endpoint);
320     code = SYNC_bindSock(state);
321     opr_Assert(!code);
322
323     InitHandler();
324     AcceptOn();
325
326     for (;;) {
327         int maxfd;
328         struct timeval s_timeout;
329         GetHandler(&SALVSYNC_readfds, &maxfd);
330         s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
331         s_timeout.tv_usec = 0;
332         /* Note: check for >= 1 below is essential since IOMGR_select
333          * doesn't have exactly same semantics as select.
334          */
335         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
336             CallHandler(&SALVSYNC_readfds);
337     }
338
339     return NULL;
340 }
341
342 static void
343 SALVSYNC_newconnection(int afd)
344 {
345 #ifdef USE_UNIX_SOCKETS
346     struct sockaddr_un other;
347 #else  /* USE_UNIX_SOCKETS */
348     struct sockaddr_in other;
349 #endif
350     int fd;
351     socklen_t junk;
352
353     junk = sizeof(other);
354     fd = accept(afd, (struct sockaddr *)&other, &junk);
355     if (fd == OSI_NULLSOCKET) {
356         osi_Panic("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
357     } else if (!AddHandler(fd, SALVSYNC_com)) {
358         AcceptOff();
359         opr_Verify(AddHandler(fd, SALVSYNC_com));
360     }
361 }
362
363 /* this function processes commands from an salvsync file descriptor (fd) */
364 static afs_int32 SALV_cnt = 0;
365 static void
366 SALVSYNC_com(osi_socket fd)
367 {
368     SYNC_command com;
369     SYNC_response res;
370     SALVSYNC_response_hdr sres_hdr;
371     SALVSYNC_command scom;
372     SALVSYNC_response sres;
373     SYNC_PROTO_BUF_DECL(buf);
374
375     memset(&com, 0, sizeof(com));
376     memset(&res, 0, sizeof(res));
377     memset(&scom, 0, sizeof(scom));
378     memset(&sres, 0, sizeof(sres));
379     memset(&sres_hdr, 0, sizeof(sres_hdr));
380
381     com.payload.buf = (void *)buf;
382     com.payload.len = SYNC_PROTO_MAX_LEN;
383     res.payload.buf = (void *) &sres_hdr;
384     res.payload.len = sizeof(sres_hdr);
385     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
386     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
387
388     scom.hdr = &com.hdr;
389     scom.sop = (SALVSYNC_command_hdr *) buf;
390     scom.com = &com;
391     sres.hdr = &res.hdr;
392     sres.sop = &sres_hdr;
393     sres.res = &res;
394
395     SALV_cnt++;
396     if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
397         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
398         SALVSYNC_Drop(fd);
399         return;
400     }
401
402     if (com.recv_len < sizeof(com.hdr)) {
403         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
404         res.hdr.response = SYNC_COM_ERROR;
405         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
406         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
407         goto respond;
408     }
409
410     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
411         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
412         res.hdr.response = SYNC_COM_ERROR;
413         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
414         goto respond;
415     }
416
417     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
418         res.hdr.response = SYNC_OK;
419         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
420
421         /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
422          * never wait for a response. */
423         goto done;
424     }
425
426     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
427         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
428         res.hdr.response = SYNC_COM_ERROR;
429         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
430         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
431         goto respond;
432     }
433
434     res.hdr.com_seq = com.hdr.com_seq;
435
436     VOL_LOCK;
437     switch (com.hdr.command) {
438     case SALVSYNC_NOP:
439         break;
440     case SALVSYNC_SALVAGE:
441     case SALVSYNC_RAISEPRIO:
442         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
443         break;
444     case SALVSYNC_CANCEL:
445         /* cancel a salvage */
446         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
447         break;
448     case SALVSYNC_CANCELALL:
449         /* cancel all queued salvages */
450         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
451         break;
452     case SALVSYNC_QUERY:
453         /* query whether a volume is done salvaging */
454         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
455         break;
456     case SALVSYNC_OP_LINK:
457         /* link a clone to its parent in the scheduler */
458         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
459         break;
460     default:
461         res.hdr.response = SYNC_BAD_COMMAND;
462         break;
463     }
464
465     sres_hdr.sq_len = salvageQueue.total_len;
466     sres_hdr.pq_len = pendingQueue.len;
467     VOL_UNLOCK;
468
469  respond:
470     SYNC_putRes(&salvsync_server_state, fd, &res);
471
472  done:
473     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
474         SALVSYNC_Drop(fd);
475     }
476 }
477
478 /**
479  * request that a volume be salvaged.
480  *
481  * @param[in]  com  inbound command object
482  * @param[out] res  outbound response object
483  *
484  * @return operation status
485  *    @retval SYNC_OK success
486  *    @retval SYNC_DENIED failed to enqueue request
487  *    @retval SYNC_FAILED malformed command packet
488  *
489  * @note this is a SALVSYNC protocol rpc handler
490  *
491  * @internal
492  *
493  * @post the volume is enqueued in the to-be-salvaged queue.
494  *       if the volume was already in the salvage queue, its
495  *       priority (and thus its location in the queue) are
496  *       updated.
497  */
498 static afs_int32
499 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
500 {
501     afs_int32 code = SYNC_OK;
502     struct SalvageQueueNode * node, * clone;
503     int hash = 0;
504
505     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
506         code = SYNC_FAILED;
507         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
508         goto done;
509     }
510
511     clone = LookupNodeByCommand(com->sop, &node);
512
513     if (node == NULL) {
514         if (AllocNode(&node)) {
515             code = SYNC_DENIED;
516             res->hdr->reason = SYNC_REASON_NOMEM;
517             goto done;
518         }
519         clone = node;
520         hash = 1;
521     }
522
523     HandlePrio(clone, node, com->sop->prio);
524
525     switch (node->state) {
526     case SALVSYNC_STATE_QUEUED:
527         UpdateCommandPrio(node);
528         break;
529
530     case SALVSYNC_STATE_ERROR:
531     case SALVSYNC_STATE_DONE:
532     case SALVSYNC_STATE_UNKNOWN:
533         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
534         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
535
536         /*
537          * make sure volgroup parent partition path is kept coherent
538          *
539          * If we ever want to support non-COW clones on a machine holding
540          * the RW site, please note that this code does not work under the
541          * conditions where someone zaps a COW clone on partition X, and
542          * subsequently creates a full clone on partition Y -- we'd need
543          * an inverse to SALVSYNC_com_Link.
544          *  -- tkeiser 11/28/2007
545          */
546         strcpy(node->command.sop.partName, com->sop->partName);
547
548         if (AddToSalvageQueue(node)) {
549             code = SYNC_DENIED;
550         }
551         break;
552
553     default:
554         break;
555     }
556
557     if (hash) {
558         AddNodeToHash(node);
559     }
560
561     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
562     res->sop->state = node->state;
563     res->sop->prio = node->command.sop.prio;
564
565  done:
566     return code;
567 }
568
569 /**
570  * cancel a pending salvage request.
571  *
572  * @param[in]  com  inbound command object
573  * @param[out] res  outbound response object
574  *
575  * @return operation status
576  *    @retval SYNC_OK success
577  *    @retval SYNC_FAILED malformed command packet
578  *
579  * @note this is a SALVSYNC protocol rpc handler
580  *
581  * @internal
582  */
583 static afs_int32
584 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
585 {
586     afs_int32 code = SYNC_OK;
587     struct SalvageQueueNode * node;
588
589     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
590         code = SYNC_FAILED;
591         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
592         goto done;
593     }
594
595     node = LookupNodeByCommand(com->sop, NULL);
596
597     if (node == NULL) {
598         res->sop->state = SALVSYNC_STATE_UNKNOWN;
599         res->sop->prio = 0;
600     } else {
601         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
602         res->sop->prio = node->command.sop.prio;
603         res->sop->state = node->state;
604         if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
605             (node->state == SALVSYNC_STATE_QUEUED)) {
606             DeleteFromSalvageQueue(node);
607         }
608     }
609
610  done:
611     return code;
612 }
613
614 /**
615  * cancel all pending salvage requests.
616  *
617  * @param[in]  com  incoming command object
618  * @param[out] res  outbound response object
619  *
620  * @return operation status
621  *    @retval SYNC_OK success
622  *
623  * @note this is a SALVSYNC protocol rpc handler
624  *
625  * @internal
626  */
627 static afs_int32
628 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
629 {
630     struct SalvageQueueNode * np, *nnp;
631     struct DiskPartition64 * dp;
632
633     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
634         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
635             DeleteFromSalvageQueue(np);
636         }
637     }
638
639     return SYNC_OK;
640 }
641
642 /**
643  * link a queue node for a clone to its parent volume.
644  *
645  * @param[in]  com   inbound command object
646  * @param[out] res   outbound response object
647  *
648  * @return operation status
649  *    @retval SYNC_OK success
650  *    @retval SYNC_FAILED malformed command packet
651  *    @retval SYNC_DENIED the request could not be completed
652  *
653  * @note this is a SALVSYNC protocol rpc handler
654  *
655  * @post the requested volume is marked as a child of another volume.
656  *       thus, future salvage requests for this volume will result in the
657  *       parent of the volume group being scheduled for salvage instead
658  *       of this clone.
659  *
660  * @internal
661  */
662 static afs_int32
663 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
664 {
665     afs_int32 code = SYNC_OK;
666     struct SalvageQueueNode * clone, * parent;
667
668     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
669         code = SYNC_FAILED;
670         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
671         goto done;
672     }
673
674     /* lookup clone's salvage scheduling node */
675     clone = LookupNodeByCommand(com->sop, NULL);
676     if (clone == NULL) {
677         code = SYNC_DENIED;
678         res->hdr->reason = SALVSYNC_REASON_ERROR;
679         goto done;
680     }
681
682     /* lookup parent's salvage scheduling node */
683     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
684     if (parent == NULL) {
685         if (AllocNode(&parent)) {
686             code = SYNC_DENIED;
687             res->hdr->reason = SYNC_REASON_NOMEM;
688             goto done;
689         }
690         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
691         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
692         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
693         AddNodeToHash(parent);
694     }
695
696     if (LinkNode(parent, clone)) {
697         code = SYNC_DENIED;
698         goto done;
699     }
700
701  done:
702     return code;
703 }
704
705 /**
706  * query the status of a volume salvage request.
707  *
708  * @param[in]  com   inbound command object
709  * @param[out] res   outbound response object
710  *
711  * @return operation status
712  *    @retval SYNC_OK success
713  *    @retval SYNC_FAILED malformed command packet
714  *
715  * @note this is a SALVSYNC protocol rpc handler
716  *
717  * @internal
718  */
719 static afs_int32
720 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
721 {
722     afs_int32 code = SYNC_OK;
723     struct SalvageQueueNode * node;
724
725     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
726         code = SYNC_FAILED;
727         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
728         goto done;
729     }
730
731     LookupNodeByCommand(com->sop, &node);
732
733     /* query whether a volume is done salvaging */
734     if (node == NULL) {
735         res->sop->state = SALVSYNC_STATE_UNKNOWN;
736         res->sop->prio = 0;
737     } else {
738         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
739         res->sop->state = node->state;
740         res->sop->prio = node->command.sop.prio;
741     }
742
743  done:
744     return code;
745 }
746
747 static void
748 SALVSYNC_Drop(osi_socket fd)
749 {
750     RemoveHandler(fd);
751     rk_closesocket(fd);
752     AcceptOn();
753 }
754
755 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
756
757 static void
758 AcceptOn(void)
759 {
760     if (AcceptHandler == -1) {
761         opr_Verify(AddHandler(salvsync_server_state.fd,
762                               SALVSYNC_newconnection));
763         AcceptHandler = FindHandler(salvsync_server_state.fd);
764     }
765 }
766
767 static void
768 AcceptOff(void)
769 {
770     if (AcceptHandler != -1) {
771         opr_Verify(RemoveHandler(salvsync_server_state.fd));
772         AcceptHandler = -1;
773     }
774 }
775
776 /* The multiple FD handling code. */
777
778 static void
779 InitHandler(void)
780 {
781     int i;
782     ObtainWriteLock(&SALVSYNC_handler_lock);
783     for (i = 0; i < MAXHANDLERS; i++) {
784         HandlerFD[i] = OSI_NULLSOCKET;
785         HandlerProc[i] = NULL;
786     }
787     ReleaseWriteLock(&SALVSYNC_handler_lock);
788 }
789
790 static void
791 CallHandler(fd_set * fdsetp)
792 {
793     int i;
794     ObtainReadLock(&SALVSYNC_handler_lock);
795     for (i = 0; i < MAXHANDLERS; i++) {
796         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
797             ReleaseReadLock(&SALVSYNC_handler_lock);
798             (*HandlerProc[i]) (HandlerFD[i]);
799             ObtainReadLock(&SALVSYNC_handler_lock);
800         }
801     }
802     ReleaseReadLock(&SALVSYNC_handler_lock);
803 }
804
805 static int
806 AddHandler(osi_socket afd, void (*aproc) (int))
807 {
808     int i;
809     ObtainWriteLock(&SALVSYNC_handler_lock);
810     for (i = 0; i < MAXHANDLERS; i++)
811         if (HandlerFD[i] == OSI_NULLSOCKET)
812             break;
813     if (i >= MAXHANDLERS) {
814         ReleaseWriteLock(&SALVSYNC_handler_lock);
815         return 0;
816     }
817     HandlerFD[i] = afd;
818     HandlerProc[i] = aproc;
819     ReleaseWriteLock(&SALVSYNC_handler_lock);
820     return 1;
821 }
822
823 static int
824 FindHandler(osi_socket afd)
825 {
826     int i;
827     ObtainReadLock(&SALVSYNC_handler_lock);
828     for (i = 0; i < MAXHANDLERS; i++)
829         if (HandlerFD[i] == afd) {
830             ReleaseReadLock(&SALVSYNC_handler_lock);
831             return i;
832         }
833     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
834     osi_Panic("Failed to find handler\n");
835     return -1;                  /* satisfy compiler */
836 }
837
838 static int
839 FindHandler_r(osi_socket afd)
840 {
841     int i;
842     for (i = 0; i < MAXHANDLERS; i++)
843         if (HandlerFD[i] == afd) {
844             return i;
845         }
846     osi_Panic("Failed to find handler\n");
847     return -1;                  /* satisfy compiler */
848 }
849
850 static int
851 RemoveHandler(osi_socket afd)
852 {
853     ObtainWriteLock(&SALVSYNC_handler_lock);
854     HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
855     ReleaseWriteLock(&SALVSYNC_handler_lock);
856     return 1;
857 }
858
859 static void
860 GetHandler(fd_set * fdsetp, int *maxfdp)
861 {
862     int i;
863     int maxfd = -1;
864     FD_ZERO(fdsetp);
865     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
866     for (i = 0; i < MAXHANDLERS; i++)
867         if (HandlerFD[i] != OSI_NULLSOCKET) {
868             FD_SET(HandlerFD[i], fdsetp);
869 #ifndef AFS_NT40_ENV
870             /* On Windows the nfds parameter to select() is ignored */
871             if (maxfd < HandlerFD[i] || maxfd == (int)-1)
872                 maxfd = HandlerFD[i];
873 #endif
874         }
875     *maxfdp = maxfd;
876     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
877 }
878
879 /**
880  * allocate a salvage queue node.
881  *
882  * @param[out] node_out  address in which to store new node pointer
883  *
884  * @return operation status
885  *    @retval 0 success
886  *    @retval 1 failed to allocate node
887  *
888  * @internal
889  */
890 static int
891 AllocNode(struct SalvageQueueNode ** node_out)
892 {
893     int code = 0;
894     struct SalvageQueueNode * node;
895
896     *node_out = node = calloc(1, sizeof(struct SalvageQueueNode));
897     if (node == NULL) {
898         code = 1;
899         goto done;
900     }
901
902     node->type = SALVSYNC_VOLGROUP_PARENT;
903     node->state = SALVSYNC_STATE_UNKNOWN;
904
905  done:
906     return code;
907 }
908
909 /**
910  * link a salvage queue node to its parent.
911  *
912  * @param[in] parent  pointer to queue node for parent of volume group
913  * @param[in] clone   pointer to queue node for a clone
914  *
915  * @return operation status
916  *    @retval 0 success
917  *    @retval 1 failure
918  *
919  * @internal
920  */
921 static int
922 LinkNode(struct SalvageQueueNode * parent,
923          struct SalvageQueueNode * clone)
924 {
925     int code = 0;
926     int idx;
927
928     /* check for attaching a clone to a clone */
929     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
930         code = 1;
931         goto done;
932     }
933
934     /* check for pre-existing registration and openings */
935     for (idx = 0; idx < VOLMAXTYPES; idx++) {
936         if (parent->volgroup.children[idx] == clone) {
937             goto linked;
938         }
939         if (parent->volgroup.children[idx] == NULL) {
940             break;
941         }
942     }
943     if (idx == VOLMAXTYPES) {
944         code = 1;
945         goto done;
946     }
947
948     /* link parent and child */
949     parent->volgroup.children[idx] = clone;
950     clone->type = SALVSYNC_VOLGROUP_CLONE;
951     clone->volgroup.parent = parent;
952
953
954  linked:
955     switch (clone->state) {
956     case SALVSYNC_STATE_QUEUED:
957         DeleteFromSalvageQueue(clone);
958
959     case SALVSYNC_STATE_SALVAGING:
960         switch (parent->state) {
961         case SALVSYNC_STATE_UNKNOWN:
962         case SALVSYNC_STATE_ERROR:
963         case SALVSYNC_STATE_DONE:
964             parent->command.sop.prio = clone->command.sop.prio;
965             AddToSalvageQueue(parent);
966             break;
967
968         case SALVSYNC_STATE_QUEUED:
969             if (clone->command.sop.prio) {
970                 parent->command.sop.prio += clone->command.sop.prio;
971                 UpdateCommandPrio(parent);
972             }
973             break;
974
975         default:
976             break;
977         }
978         break;
979
980     default:
981         break;
982     }
983
984  done:
985     return code;
986 }
987
988 static void
989 HandlePrio(struct SalvageQueueNode * clone,
990            struct SalvageQueueNode * node,
991            afs_uint32 new_prio)
992 {
993     afs_uint32 delta;
994
995     switch (node->state) {
996     case SALVSYNC_STATE_ERROR:
997     case SALVSYNC_STATE_DONE:
998     case SALVSYNC_STATE_UNKNOWN:
999         node->command.sop.prio = 0;
1000         break;
1001     default:
1002         break;
1003     }
1004
1005     if (new_prio < clone->command.sop.prio) {
1006         /* strange. let's just set our delta to 1 */
1007         delta = 1;
1008     } else {
1009         delta = new_prio - clone->command.sop.prio;
1010     }
1011
1012     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1013         clone->command.sop.prio = new_prio;
1014     }
1015
1016     node->command.sop.prio += delta;
1017 }
1018
1019 static int
1020 AddToSalvageQueue(struct SalvageQueueNode * node)
1021 {
1022     afs_int32 id;
1023     struct SalvageQueueNode * last = NULL;
1024
1025     id = volutil_GetPartitionID(node->command.sop.partName);
1026     if (id < 0 || id > VOLMAXPARTS) {
1027         return 1;
1028     }
1029     if (!VGetPartitionById_r(id, 0)) {
1030         /* don't enqueue salvage requests for unmounted partitions */
1031         return 1;
1032     }
1033     if (queue_IsOnQueue(node)) {
1034         return 0;
1035     }
1036
1037     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1038         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1039     }
1040     queue_Append(&salvageQueue.part[id], node);
1041     salvageQueue.len[id]++;
1042     salvageQueue.total_len++;
1043     salvageQueue.last_insert = id;
1044     node->partition_id = id;
1045     node->state = SALVSYNC_STATE_QUEUED;
1046
1047     /* reorder, if necessary */
1048     if (last && last->command.sop.prio < node->command.sop.prio) {
1049         UpdateCommandPrio(node);
1050     }
1051
1052     CV_BROADCAST(&salvageQueue.cv);
1053     return 0;
1054 }
1055
1056 static void
1057 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1058 {
1059     if (queue_IsOnQueue(node)) {
1060         queue_Remove(node);
1061         salvageQueue.len[node->partition_id]--;
1062         salvageQueue.total_len--;
1063         node->state = SALVSYNC_STATE_UNKNOWN;
1064         CV_BROADCAST(&salvageQueue.cv);
1065     }
1066 }
1067
1068 static void
1069 AddToPendingQueue(struct SalvageQueueNode * node)
1070 {
1071     queue_Append(&pendingQueue, node);
1072     pendingQueue.len++;
1073     node->state = SALVSYNC_STATE_SALVAGING;
1074     CV_BROADCAST(&pendingQueue.queue_change_cv);
1075 }
1076
1077 static void
1078 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1079 {
1080     if (queue_IsOnQueue(node)) {
1081         queue_Remove(node);
1082         pendingQueue.len--;
1083         node->state = SALVSYNC_STATE_UNKNOWN;
1084         CV_BROADCAST(&pendingQueue.queue_change_cv);
1085     }
1086 }
1087
1088 #if 0
1089 static struct SalvageQueueNode *
1090 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1091 {
1092     struct SalvageQueueNode * np, * nnp;
1093
1094     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1095         if ((np->command.sop.volume == qry->volume) &&
1096             !strncmp(np->command.sop.partName, qry->partName,
1097                      sizeof(qry->partName)))
1098             break;
1099     }
1100
1101     if (queue_IsEnd(&pendingQueue, np))
1102         np = NULL;
1103     return np;
1104 }
1105 #endif
1106
1107 static struct SalvageQueueNode *
1108 LookupPendingCommandByPid(int pid)
1109 {
1110     struct SalvageQueueNode * np, * nnp;
1111
1112     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1113         if (np->pid == pid)
1114             break;
1115     }
1116
1117     if (queue_IsEnd(&pendingQueue, np))
1118         np = NULL;
1119     return np;
1120 }
1121
1122
1123 /* raise the priority of a previously scheduled salvage */
1124 static void
1125 UpdateCommandPrio(struct SalvageQueueNode * node)
1126 {
1127     struct SalvageQueueNode *np, *nnp;
1128     afs_int32 id;
1129     afs_uint32 prio;
1130
1131     opr_Assert(queue_IsOnQueue(node));
1132
1133     prio = node->command.sop.prio;
1134     id = node->partition_id;
1135     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1136         queue_Remove(node);
1137         queue_Prepend(&salvageQueue.part[id], node);
1138     } else {
1139         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1140             if (np->command.sop.prio > prio)
1141                 break;
1142         }
1143         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1144             queue_Remove(node);
1145             queue_Prepend(&salvageQueue.part[id], node);
1146         } else if (node != np) {
1147             queue_Remove(node);
1148             queue_InsertAfter(np, node);
1149         }
1150     }
1151 }
1152
1153 /* this will need to be rearchitected if we ever want more than one thread
1154  * to wait for new salvage nodes */
1155 struct SalvageQueueNode *
1156 SALVSYNC_getWork(void)
1157 {
1158     int i;
1159     struct DiskPartition64 * dp = NULL, * fdp;
1160     static afs_int32 next_part_sched = 0;
1161     struct SalvageQueueNode *node = NULL;
1162
1163     VOL_LOCK;
1164
1165     /*
1166      * wait for work to be scheduled
1167      * if there are no disk partitions, just sit in this wait loop forever
1168      */
1169     while (!salvageQueue.total_len || !DiskPartitionList) {
1170         VOL_CV_WAIT(&salvageQueue.cv);
1171     }
1172
1173     /*
1174      * short circuit for simple case where only one partition has
1175      * scheduled salvages
1176      */
1177     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1178         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1179         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1180         goto have_node;
1181     }
1182
1183
1184     /*
1185      * ok, more than one partition has scheduled salvages.
1186      * now search for partitions with scheduled salvages, but no pending salvages.
1187      */
1188     dp = VGetPartitionById_r(next_part_sched, 0);
1189     if (!dp) {
1190         dp = DiskPartitionList;
1191     }
1192     fdp = dp;
1193
1194     for (i=0 ;
1195          !i || dp != fdp ;
1196          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1197         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1198             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1199             goto have_node;
1200         }
1201     }
1202
1203
1204     /*
1205      * all partitions with scheduled salvages have at least one pending.
1206      * now do an exhaustive search for a scheduled salvage.
1207      */
1208     dp = fdp;
1209
1210     for (i=0 ;
1211          !i || dp != fdp ;
1212          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1213         if (salvageQueue.len[dp->index]) {
1214             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1215             goto have_node;
1216         }
1217     }
1218
1219     /* we should never reach this line */
1220     osi_Panic("Node not found\n");
1221
1222  have_node:
1223     opr_Assert(node != NULL);
1224     node->pid = 0;
1225     partition_salvaging[node->partition_id]++;
1226     DeleteFromSalvageQueue(node);
1227     AddToPendingQueue(node);
1228
1229     if (dp) {
1230         /* update next_part_sched field */
1231         if (dp->next) {
1232             next_part_sched = dp->next->index;
1233         } else if (DiskPartitionList) {
1234             next_part_sched = DiskPartitionList->index;
1235         } else {
1236             next_part_sched = -1;
1237         }
1238     }
1239
1240     VOL_UNLOCK;
1241     return node;
1242 }
1243
1244 /**
1245  * update internal scheduler state to reflect completion of a work unit.
1246  *
1247  * @param[in]  node    salvage queue node object pointer
1248  * @param[in]  result  worker process result code
1249  *
1250  * @post scheduler state is updated.
1251  *
1252  * @internal
1253  */
1254 static void
1255 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1256 {
1257     afs_int32 partid;
1258     int idx;
1259
1260     DeleteFromPendingQueue(node);
1261     partid = node->partition_id;
1262     if (partid >=0 && partid <= VOLMAXPARTS) {
1263         partition_salvaging[partid]--;
1264     }
1265     if (result == 0) {
1266         node->state = SALVSYNC_STATE_DONE;
1267     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1268         node->state = SALVSYNC_STATE_ERROR;
1269     }
1270
1271     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1272         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1273             if (node->volgroup.children[idx]) {
1274                 node->volgroup.children[idx]->state = node->state;
1275             }
1276         }
1277     }
1278 }
1279
1280 /**
1281  * check whether worker child failed.
1282  *
1283  * @param[in] status  status bitfield return by wait()
1284  *
1285  * @return boolean failure code
1286  *    @retval 0 child succeeded
1287  *    @retval 1 child failed
1288  *
1289  * @internal
1290  */
1291 static int
1292 ChildFailed(int status)
1293 {
1294     return (WCOREDUMP(status) ||
1295             WIFSIGNALED(status) ||
1296             ((WEXITSTATUS(status) != 0) &&
1297              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1298 }
1299
1300
1301 /**
1302  * notify salvsync scheduler of node completion, by child pid.
1303  *
1304  * @param[in]  pid     pid of worker child
1305  * @param[in]  status  worker status bitfield from wait()
1306  *
1307  * @post scheduler state is updated.
1308  *       if status code is a failure, fileserver notification was attempted
1309  *
1310  * @see SALVSYNC_doneWork_r
1311  */
1312 void
1313 SALVSYNC_doneWorkByPid(int pid, int status)
1314 {
1315     struct SalvageQueueNode * node;
1316     char partName[16];
1317     VolumeId volids[VOLMAXTYPES+1];
1318     unsigned int idx;
1319
1320     memset(volids, 0, sizeof(volids));
1321
1322     VOL_LOCK;
1323     node = LookupPendingCommandByPid(pid);
1324     if (node != NULL) {
1325         SALVSYNC_doneWork_r(node, status);
1326
1327         if (ChildFailed(status)) {
1328             /* populate volume id list for later processing outside the glock */
1329             volids[0] = node->command.sop.volume;
1330             strcpy(partName, node->command.sop.partName);
1331             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1332                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1333                     if (node->volgroup.children[idx]) {
1334                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1335                     }
1336                 }
1337             }
1338         }
1339     }
1340     VOL_UNLOCK;
1341
1342     /*
1343      * if necessary, notify fileserver of
1344      * failure to salvage volume group
1345      * [we cannot guarantee that the child made the
1346      *  appropriate notifications (e.g. SIGSEGV)]
1347      *  -- tkeiser 11/28/2007
1348      */
1349     if (ChildFailed(status)) {
1350         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1351             if (volids[idx]) {
1352                 FSYNC_VolOp(volids[idx],
1353                             partName,
1354                             FSYNC_VOL_FORCE_ERROR,
1355                             FSYNC_WHATEVER,
1356                             NULL);
1357             }
1358         }
1359     }
1360 }
1361
1362 #endif /* AFS_DEMAND_ATTACH_FS */