Turn on --enable-bos-new-config unconditionally
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29
30 #include <sys/types.h>
31 #include <stdio.h>
32 #ifdef AFS_NT40_ENV
33 #include <winsock2.h>
34 #include <time.h>
35 #else
36 #include <sys/param.h>
37 #include <sys/socket.h>
38 #include <netinet/in.h>
39 #include <netdb.h>
40 #include <sys/time.h>
41 #endif
42 #include <errno.h>
43 #include <assert.h>
44 #include <signal.h>
45 #include <string.h>
46
47
48 #include <rx/xdr.h>
49 #include <afs/afsint.h>
50 #include "nfs.h"
51 #include <afs/errors.h>
52 #include "salvsync.h"
53 #include "lwp.h"
54 #include "lock.h"
55 #include <afs/afssyscalls.h>
56 #include "ihandle.h"
57 #include "vnode.h"
58 #include "volume.h"
59 #include "partition.h"
60 #include <rx/rx_queue.h>
61 #include <afs/procmgmt.h>
62
63 #if !defined(offsetof)
64 #include <stddef.h>
65 #endif
66
67 #ifdef USE_UNIX_SOCKETS
68 #include <afs/afsutil.h>
69 #include <sys/un.h>
70 #endif
71
72
73 /*@printflike@*/ extern void Log(const char *format, ...);
74
75 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
76                                  * move = dump+restore can run on single server */
77
78
79 /*
80  * This lock controls access to the handler array.
81  */
82 struct Lock SALVSYNC_handler_lock;
83
84
85 #ifdef AFS_DEMAND_ATTACH_FS
86 /*
87  * SALVSYNC is a feature specific to the demand attach fileserver
88  */
89
90 /* Forward declarations */
91 static void * SALVSYNC_syncThread(void *);
92 static void SALVSYNC_newconnection(osi_socket fd);
93 static void SALVSYNC_com(osi_socket fd);
94 static void SALVSYNC_Drop(osi_socket fd);
95 static void AcceptOn(void);
96 static void AcceptOff(void);
97 static void InitHandler(void);
98 static void CallHandler(fd_set * fdsetp);
99 static int AddHandler(osi_socket afd, void (*aproc) (int));
100 static int FindHandler(register osi_socket afd);
101 static int FindHandler_r(register osi_socket afd);
102 static int RemoveHandler(register osi_socket afd);
103 static void GetHandler(fd_set * fdsetp, int *maxfdp);
104
105 static int AllocNode(struct SalvageQueueNode ** node);
106
107 static int AddToSalvageQueue(struct SalvageQueueNode * node);
108 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
109 static void AddToPendingQueue(struct SalvageQueueNode * node);
110 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
111 static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
112 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
113 static void UpdateCommandPrio(struct SalvageQueueNode * node);
114 static void HandlePrio(struct SalvageQueueNode * clone, 
115                        struct SalvageQueueNode * parent,
116                        afs_uint32 new_prio);
117
118 static int LinkNode(struct SalvageQueueNode * parent,
119                     struct SalvageQueueNode * clone);
120
121 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName, 
122                                             struct SalvageQueueNode ** parent);
123 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
124                                                      struct SalvageQueueNode ** parent);
125 static void AddNodeToHash(struct SalvageQueueNode * node);
126 static void DeleteNodeFromHash(struct SalvageQueueNode * node);
127
128 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
129 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
130 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
131 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
132 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
133
134
135 extern int LogLevel;
136 extern int VInit;
137 extern pthread_mutex_t vol_salvsync_mutex;
138
139 /**
140  * salvsync server socket handle.
141  */
142 static SYNC_server_state_t salvsync_server_state = 
143     { -1,                       /* file descriptor */
144       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
145       SALVSYNC_PROTO_VERSION,   /* protocol version */
146       5,                        /* bind() retry limit */
147       100,                      /* listen() queue depth */
148       "SALVSYNC",               /* protocol name string */
149     };
150
151
152 /**
153  * queue of all volumes waiting to be salvaged.
154  */
155 struct SalvageQueue {
156     volatile int total_len;
157     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
158     volatile int len[VOLMAXPARTS+1];
159     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
160     pthread_cond_t cv;
161 };
162 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
163
164 /**
165  * queue of all volumes currently being salvaged.
166  */
167 struct QueueHead {
168     volatile struct rx_queue q;  /**< queue of salvages in progress */
169     volatile int len;            /**< length of in-progress queue */
170     pthread_cond_t queue_change_cv;
171 };
172 static struct QueueHead pendingQueue;  /* volumes being salvaged */
173
174 /* XXX
175  * whether a partition has a salvage in progress
176  *
177  * the salvager code only permits one salvage per partition at a time
178  *
179  * the following hack tries to keep salvaged parallelism high by
180  * only permitting one salvage dispatch per partition at a time
181  *
182  * unfortunately, the parallel salvager currently
183  * has a rather braindead routine that won't permit
184  * multiple salvages on the same "device".  this
185  * function happens to break pretty badly on lvm, raid luns, etc.
186  *
187  * this hack isn't good enough to stop the device limiting code from
188  * crippling performance.  someday that code needs to be rewritten
189  */
190 static int partition_salvaging[VOLMAXPARTS+1];
191
192 static int HandlerFD[MAXHANDLERS];
193 static void (*HandlerProc[MAXHANDLERS]) (int);
194
195 #define VSHASH_SIZE 64
196 #define VSHASH_MASK (VSHASH_SIZE-1)
197 #define VSHASH(vid) ((vid)&VSHASH_MASK)
198
199 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
200
201 static struct SalvageQueueNode *
202 LookupNode(afs_uint32 vid, char * partName,
203            struct SalvageQueueNode ** parent)
204 {
205     struct rx_queue *qp, *nqp;
206     struct SalvageQueueNode *vsp;
207     int idx = VSHASH(vid);
208
209     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
210         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
211         if ((vsp->command.sop.volume == vid) &&
212             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
213             break;
214         }
215     }
216
217     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
218         vsp = NULL;
219     }
220
221     if (parent) {
222         if (vsp) {
223             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
224                 vsp->volgroup.parent : vsp;
225         } else {
226             *parent = NULL;
227         }
228     }
229
230     return vsp;
231 }
232
233 static struct SalvageQueueNode *
234 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
235                     struct SalvageQueueNode ** parent)
236 {
237     return LookupNode(qry->volume, qry->partName, parent);
238 }
239
240 static void
241 AddNodeToHash(struct SalvageQueueNode * node)
242 {
243     int idx = VSHASH(node->command.sop.volume);
244
245     if (queue_IsOnQueue(&node->hash_chain)) {
246         return;
247     }
248
249     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
250     SalvageHashTable[idx].len++;
251 }
252
253 static void
254 DeleteNodeFromHash(struct SalvageQueueNode * node)
255 {
256     int idx = VSHASH(node->command.sop.volume);
257
258     if (queue_IsNotOnQueue(&node->hash_chain)) {
259         return;
260     }
261
262     queue_Remove(&node->hash_chain);
263     SalvageHashTable[idx].len--;
264 }
265
266 void
267 SALVSYNC_salvInit(void)
268 {
269     int i;
270     pthread_t tid;
271     pthread_attr_t tattr;
272
273     /* initialize the queues */
274     Lock_Init(&SALVSYNC_handler_lock);
275     assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
276     for (i = 0; i <= VOLMAXPARTS; i++) {
277         queue_Init(&salvageQueue.part[i]);
278         salvageQueue.len[i] = 0;
279     }
280     assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
281     queue_Init(&pendingQueue);
282     salvageQueue.total_len = pendingQueue.len = 0;
283     salvageQueue.last_insert = -1;
284     memset(partition_salvaging, 0, sizeof(partition_salvaging));
285
286     for (i = 0; i < VSHASH_SIZE; i++) {
287         assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
288         SalvageHashTable[i].len = 0;
289         queue_Init(&SalvageHashTable[i]);
290     }
291
292     /* start the salvsync thread */
293     assert(pthread_attr_init(&tattr) == 0);
294     assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
295     assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
296 }
297
298 static void
299 CleanFDs(void)
300 {
301     int i;
302     for (i = 0; i < MAXHANDLERS; ++i) {
303         if (HandlerFD[i] >= 0) {
304             SALVSYNC_Drop(HandlerFD[i]);
305         }
306     }
307
308     /* just in case we were in AcceptOff mode, and thus this fd wouldn't
309      * have a handler */
310     close(salvsync_server_state.fd);
311     salvsync_server_state.fd = -1;
312 }
313
314 static fd_set SALVSYNC_readfds;
315
316 static void *
317 SALVSYNC_syncThread(void * args)
318 {
319     int on = 1;
320     int code;
321     int numTries;
322     int tid;
323     SYNC_server_state_t * state = &salvsync_server_state;
324
325     /* when we fork, the child needs to close the salvsync server sockets,
326      * otherwise, it may get salvsync requests, instead of the parent
327      * salvageserver */
328     assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
329
330     SYNC_getAddr(&state->endpoint, &state->addr);
331     SYNC_cleanupSock(state);
332
333 #ifndef AFS_NT40_ENV
334     (void)signal(SIGPIPE, SIG_IGN);
335 #endif
336
337     state->fd = SYNC_getSock(&state->endpoint);
338     code = SYNC_bindSock(state);
339     assert(!code);
340
341     InitHandler();
342     AcceptOn();
343
344     for (;;) {
345         int maxfd;
346         GetHandler(&SALVSYNC_readfds, &maxfd);
347         /* Note: check for >= 1 below is essential since IOMGR_select
348          * doesn't have exactly same semantics as select.
349          */
350         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
351             CallHandler(&SALVSYNC_readfds);
352     }
353
354     return NULL;
355 }
356
357 static void
358 SALVSYNC_newconnection(int afd)
359 {
360 #ifdef USE_UNIX_SOCKETS
361     struct sockaddr_un other;
362 #else  /* USE_UNIX_SOCKETS */
363     struct sockaddr_in other;
364 #endif
365     int junk, fd;
366     junk = sizeof(other);
367     fd = accept(afd, (struct sockaddr *)&other, &junk);
368     if (fd == -1) {
369         Log("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
370         assert(1 == 2);
371     } else if (!AddHandler(fd, SALVSYNC_com)) {
372         AcceptOff();
373         assert(AddHandler(fd, SALVSYNC_com));
374     }
375 }
376
377 /* this function processes commands from an salvsync file descriptor (fd) */
378 static afs_int32 SALV_cnt = 0;
379 static void
380 SALVSYNC_com(osi_socket fd)
381 {
382     SYNC_command com;
383     SYNC_response res;
384     SALVSYNC_response_hdr sres_hdr;
385     SALVSYNC_command scom;
386     SALVSYNC_response sres;
387     SYNC_PROTO_BUF_DECL(buf);
388
389     memset(&com, 0, sizeof(com));
390     memset(&res, 0, sizeof(res));
391     memset(&scom, 0, sizeof(scom));
392     memset(&sres, 0, sizeof(sres));
393     memset(&sres_hdr, 0, sizeof(sres));
394     
395     com.payload.buf = (void *)buf;
396     com.payload.len = SYNC_PROTO_MAX_LEN;
397     res.payload.buf = (void *) &sres_hdr;
398     res.payload.len = sizeof(sres_hdr);
399     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
400     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
401
402     scom.hdr = &com.hdr;
403     scom.sop = (SALVSYNC_command_hdr *) buf;
404     scom.com = &com;
405     sres.hdr = &res.hdr;
406     sres.sop = &sres_hdr;
407     sres.res = &res;
408
409     SALV_cnt++;
410     if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
411         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
412         SALVSYNC_Drop(fd);
413         return;
414     }
415
416     if (com.recv_len < sizeof(com.hdr)) {
417         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
418         res.hdr.response = SYNC_COM_ERROR;
419         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
420         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
421         goto respond;
422     }
423
424     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
425         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
426         res.hdr.response = SYNC_COM_ERROR;
427         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
428         goto respond;
429     }
430
431     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
432         res.hdr.response = SYNC_OK;
433         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
434         goto respond;
435     }
436
437     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
438         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
439         res.hdr.response = SYNC_COM_ERROR;
440         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
441         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
442         goto respond;
443     }
444
445     res.hdr.com_seq = com.hdr.com_seq;
446
447     VOL_LOCK;
448     switch (com.hdr.command) {
449     case SALVSYNC_NOP:
450         break;
451     case SALVSYNC_SALVAGE:
452     case SALVSYNC_RAISEPRIO:
453         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
454         break;
455     case SALVSYNC_CANCEL:
456         /* cancel a salvage */
457         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
458         break;
459     case SALVSYNC_CANCELALL:
460         /* cancel all queued salvages */
461         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
462         break;
463     case SALVSYNC_QUERY:
464         /* query whether a volume is done salvaging */
465         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
466         break;
467     case SALVSYNC_OP_LINK:
468         /* link a clone to its parent in the scheduler */
469         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
470         break;
471     default:
472         res.hdr.response = SYNC_BAD_COMMAND;
473         break;
474     }
475
476     sres_hdr.sq_len = salvageQueue.total_len;
477     sres_hdr.pq_len = pendingQueue.len;
478     VOL_UNLOCK;
479
480  respond:
481     SYNC_putRes(&salvsync_server_state, fd, &res);
482     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
483         SALVSYNC_Drop(fd);
484     }
485 }
486
487 /**
488  * request that a volume be salvaged.
489  *
490  * @param[in]  com  inbound command object
491  * @param[out] res  outbound response object
492  *
493  * @return operation status
494  *    @retval SYNC_OK success
495  *    @retval SYNC_DENIED failed to enqueue request
496  *    @retval SYNC_FAILED malformed command packet
497  *
498  * @note this is a SALVSYNC protocol rpc handler
499  *
500  * @internal
501  *
502  * @post the volume is enqueued in the to-be-salvaged queue.  
503  *       if the volume was already in the salvage queue, its 
504  *       priority (and thus its location in the queue) are 
505  *       updated.
506  */
507 static afs_int32
508 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
509 {
510     afs_int32 code = SYNC_OK;
511     struct SalvageQueueNode * node, * clone;
512     int hash = 0;
513
514     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
515         code = SYNC_FAILED;
516         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
517         goto done;
518     }
519
520     clone = LookupNodeByCommand(com->sop, &node);
521
522     if (node == NULL) {
523         if (AllocNode(&node)) {
524             code = SYNC_DENIED;
525             res->hdr->reason = SYNC_REASON_NOMEM;
526             goto done;
527         }
528         clone = node;
529         hash = 1;
530     }
531
532     HandlePrio(clone, node, com->sop->prio);
533
534     switch (node->state) {
535     case SALVSYNC_STATE_QUEUED:
536         UpdateCommandPrio(node);
537         break;
538
539     case SALVSYNC_STATE_ERROR:
540     case SALVSYNC_STATE_DONE:
541     case SALVSYNC_STATE_UNKNOWN:
542         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
543         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
544
545         /* 
546          * make sure volgroup parent partition path is kept coherent
547          *
548          * If we ever want to support non-COW clones on a machine holding
549          * the RW site, please note that this code does not work under the
550          * conditions where someone zaps a COW clone on partition X, and
551          * subsequently creates a full clone on partition Y -- we'd need
552          * an inverse to SALVSYNC_com_Link.
553          *  -- tkeiser 11/28/2007
554          */
555         strcpy(node->command.sop.partName, com->sop->partName);
556
557         if (AddToSalvageQueue(node)) {
558             code = SYNC_DENIED;
559         }
560         break;
561
562     default:
563         break;
564     }
565
566     if (hash) {
567         AddNodeToHash(node);
568     }
569
570     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
571     res->sop->state = node->state;
572     res->sop->prio = node->command.sop.prio;
573
574  done:
575     return code;
576 }
577
578 /**
579  * cancel a pending salvage request.
580  *
581  * @param[in]  com  inbound command object
582  * @param[out] res  outbound response object
583  *
584  * @return operation status
585  *    @retval SYNC_OK success
586  *    @retval SYNC_FAILED malformed command packet
587  *
588  * @note this is a SALVSYNC protocol rpc handler
589  *
590  * @internal
591  */
592 static afs_int32
593 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
594 {
595     afs_int32 code = SYNC_OK;
596     struct SalvageQueueNode * node;
597
598     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
599         code = SYNC_FAILED;
600         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
601         goto done;
602     }
603
604     node = LookupNodeByCommand(com->sop, NULL);
605
606     if (node == NULL) {
607         res->sop->state = SALVSYNC_STATE_UNKNOWN;
608         res->sop->prio = 0;
609     } else {
610         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
611         res->sop->prio = node->command.sop.prio;
612         res->sop->state = node->state;
613         if ((node->type == SALVSYNC_VOLGROUP_PARENT) && 
614             (node->state == SALVSYNC_STATE_QUEUED)) {
615             DeleteFromSalvageQueue(node);
616         }
617     }
618
619  done:
620     return code;
621 }
622
623 /**
624  * cancel all pending salvage requests.
625  *
626  * @param[in]  com  incoming command object
627  * @param[out] res  outbound response object
628  *
629  * @return operation status
630  *    @retval SYNC_OK success
631  *
632  * @note this is a SALVSYNC protocol rpc handler
633  *
634  * @internal
635  */
636 static afs_int32
637 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
638 {
639     struct SalvageQueueNode * np, *nnp;
640     struct DiskPartition64 * dp;
641
642     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
643         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
644             DeleteFromSalvageQueue(np);
645         }
646     }
647
648     return SYNC_OK;
649 }
650
651 /**
652  * link a queue node for a clone to its parent volume.
653  *
654  * @param[in]  com   inbound command object
655  * @param[out] res   outbound response object
656  *
657  * @return operation status
658  *    @retval SYNC_OK success
659  *    @retval SYNC_FAILED malformed command packet
660  *    @retval SYNC_DENIED the request could not be completed
661  *
662  * @note this is a SALVSYNC protocol rpc handler
663  *
664  * @post the requested volume is marked as a child of another volume.
665  *       thus, future salvage requests for this volume will result in the
666  *       parent of the volume group being scheduled for salvage instead
667  *       of this clone.
668  *
669  * @internal
670  */
671 static afs_int32
672 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
673 {
674     afs_int32 code = SYNC_OK;
675     struct SalvageQueueNode * clone, * parent;
676
677     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
678         code = SYNC_FAILED;
679         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
680         goto done;
681     }
682
683     /* lookup clone's salvage scheduling node */
684     clone = LookupNodeByCommand(com->sop, NULL);
685     if (clone == NULL) {
686         code = SYNC_DENIED;
687         res->hdr->reason = SALVSYNC_REASON_ERROR;
688         goto done;
689     }
690
691     /* lookup parent's salvage scheduling node */
692     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
693     if (parent == NULL) {
694         if (AllocNode(&parent)) {
695             code = SYNC_DENIED;
696             res->hdr->reason = SYNC_REASON_NOMEM;
697             goto done;
698         }
699         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
700         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
701         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
702         AddNodeToHash(parent);
703     }
704
705     if (LinkNode(parent, clone)) {
706         code = SYNC_DENIED;
707         goto done;
708     }
709
710  done:
711     return code;
712 }
713
714 /**
715  * query the status of a volume salvage request.
716  *
717  * @param[in]  com   inbound command object
718  * @param[out] res   outbound response object
719  *
720  * @return operation status
721  *    @retval SYNC_OK success
722  *    @retval SYNC_FAILED malformed command packet
723  *
724  * @note this is a SALVSYNC protocol rpc handler
725  *
726  * @internal
727  */
728 static afs_int32
729 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
730 {
731     afs_int32 code = SYNC_OK;
732     struct SalvageQueueNode * node;
733
734     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
735         code = SYNC_FAILED;
736         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
737         goto done;
738     }
739
740     LookupNodeByCommand(com->sop, &node);
741
742     /* query whether a volume is done salvaging */
743     if (node == NULL) {
744         res->sop->state = SALVSYNC_STATE_UNKNOWN;
745         res->sop->prio = 0;
746     } else {
747         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
748         res->sop->state = node->state;
749         res->sop->prio = node->command.sop.prio;
750     }
751
752  done:
753     return code;
754 }
755
756 static void
757 SALVSYNC_Drop(osi_socket fd)
758 {
759     RemoveHandler(fd);
760 #ifdef AFS_NT40_ENV
761     closesocket(fd);
762 #else
763     close(fd);
764 #endif
765     AcceptOn();
766 }
767
768 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
769
770 static void
771 AcceptOn(void)
772 {
773     if (AcceptHandler == -1) {
774         assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
775         AcceptHandler = FindHandler(salvsync_server_state.fd);
776     }
777 }
778
779 static void
780 AcceptOff(void)
781 {
782     if (AcceptHandler != -1) {
783         assert(RemoveHandler(salvsync_server_state.fd));
784         AcceptHandler = -1;
785     }
786 }
787
788 /* The multiple FD handling code. */
789
790 static void
791 InitHandler(void)
792 {
793     register int i;
794     ObtainWriteLock(&SALVSYNC_handler_lock);
795     for (i = 0; i < MAXHANDLERS; i++) {
796         HandlerFD[i] = -1;
797         HandlerProc[i] = NULL;
798     }
799     ReleaseWriteLock(&SALVSYNC_handler_lock);
800 }
801
802 static void
803 CallHandler(fd_set * fdsetp)
804 {
805     register int i;
806     ObtainReadLock(&SALVSYNC_handler_lock);
807     for (i = 0; i < MAXHANDLERS; i++) {
808         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
809             ReleaseReadLock(&SALVSYNC_handler_lock);
810             (*HandlerProc[i]) (HandlerFD[i]);
811             ObtainReadLock(&SALVSYNC_handler_lock);
812         }
813     }
814     ReleaseReadLock(&SALVSYNC_handler_lock);
815 }
816
817 static int
818 AddHandler(osi_socket afd, void (*aproc) (int))
819 {
820     register int i;
821     ObtainWriteLock(&SALVSYNC_handler_lock);
822     for (i = 0; i < MAXHANDLERS; i++)
823         if (HandlerFD[i] == -1)
824             break;
825     if (i >= MAXHANDLERS) {
826         ReleaseWriteLock(&SALVSYNC_handler_lock);
827         return 0;
828     }
829     HandlerFD[i] = afd;
830     HandlerProc[i] = aproc;
831     ReleaseWriteLock(&SALVSYNC_handler_lock);
832     return 1;
833 }
834
835 static int
836 FindHandler(register osi_socket afd)
837 {
838     register int i;
839     ObtainReadLock(&SALVSYNC_handler_lock);
840     for (i = 0; i < MAXHANDLERS; i++)
841         if (HandlerFD[i] == afd) {
842             ReleaseReadLock(&SALVSYNC_handler_lock);
843             return i;
844         }
845     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
846     assert(1 == 2);
847     return -1;                  /* satisfy compiler */
848 }
849
850 static int
851 FindHandler_r(register osi_socket afd)
852 {
853     register int i;
854     for (i = 0; i < MAXHANDLERS; i++)
855         if (HandlerFD[i] == afd) {
856             return i;
857         }
858     assert(1 == 2);
859     return -1;                  /* satisfy compiler */
860 }
861
862 static int
863 RemoveHandler(register osi_socket afd)
864 {
865     ObtainWriteLock(&SALVSYNC_handler_lock);
866     HandlerFD[FindHandler_r(afd)] = -1;
867     ReleaseWriteLock(&SALVSYNC_handler_lock);
868     return 1;
869 }
870
871 static void
872 GetHandler(fd_set * fdsetp, int *maxfdp)
873 {
874     register int i;
875     register int maxfd = -1;
876     FD_ZERO(fdsetp);
877     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
878     for (i = 0; i < MAXHANDLERS; i++)
879         if (HandlerFD[i] != -1) {
880             FD_SET(HandlerFD[i], fdsetp);
881             if (maxfd < HandlerFD[i])
882                 maxfd = HandlerFD[i];
883         }
884     *maxfdp = maxfd;
885     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
886 }
887
888 /**
889  * allocate a salvage queue node.
890  *
891  * @param[out] node_out  address in which to store new node pointer
892  *
893  * @return operation status
894  *    @retval 0 success
895  *    @retval 1 failed to allocate node
896  *
897  * @internal
898  */
899 static int
900 AllocNode(struct SalvageQueueNode ** node_out)
901 {
902     int code = 0;
903     struct SalvageQueueNode * node;
904
905     *node_out = node = (struct SalvageQueueNode *) 
906         malloc(sizeof(struct SalvageQueueNode));
907     if (node == NULL) {
908         code = 1;
909         goto done;
910     }
911
912     memset(node, 0, sizeof(struct SalvageQueueNode));
913     node->type = SALVSYNC_VOLGROUP_PARENT;
914     node->state = SALVSYNC_STATE_UNKNOWN;
915
916  done:
917     return code;
918 }
919
920 /**
921  * link a salvage queue node to its parent.
922  *
923  * @param[in] parent  pointer to queue node for parent of volume group
924  * @param[in] clone   pointer to queue node for a clone
925  *
926  * @return operation status
927  *    @retval 0 success
928  *    @retval 1 failure
929  *
930  * @internal
931  */
932 static int
933 LinkNode(struct SalvageQueueNode * parent,
934          struct SalvageQueueNode * clone)
935 {
936     int code = 0;
937     int idx;
938
939     /* check for attaching a clone to a clone */
940     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
941         code = 1;
942         goto done;
943     }
944
945     /* check for pre-existing registration and openings */
946     for (idx = 0; idx < VOLMAXTYPES; idx++) {
947         if (parent->volgroup.children[idx] == clone) {
948             goto linked;
949         }
950         if (parent->volgroup.children[idx] == NULL) {
951             break;
952         }
953     }
954     if (idx == VOLMAXTYPES) {
955         code = 1;
956         goto done;
957     }
958
959     /* link parent and child */
960     parent->volgroup.children[idx] = clone;
961     clone->type = SALVSYNC_VOLGROUP_CLONE;
962     clone->volgroup.parent = parent;
963
964
965  linked:
966     switch (clone->state) {
967     case SALVSYNC_STATE_QUEUED:
968         DeleteFromSalvageQueue(clone);
969
970     case SALVSYNC_STATE_SALVAGING:
971         switch (parent->state) {
972         case SALVSYNC_STATE_UNKNOWN:
973         case SALVSYNC_STATE_ERROR:
974         case SALVSYNC_STATE_DONE:
975             parent->command.sop.prio = clone->command.sop.prio;
976             AddToSalvageQueue(parent);
977             break;
978
979         case SALVSYNC_STATE_QUEUED:
980             if (clone->command.sop.prio) {
981                 parent->command.sop.prio += clone->command.sop.prio;
982                 UpdateCommandPrio(parent);
983             }
984             break;
985
986         default:
987             break;
988         }
989         break;
990
991     default:
992         break;
993     }
994
995  done:
996     return code;
997 }
998
999 static void
1000 HandlePrio(struct SalvageQueueNode * clone, 
1001            struct SalvageQueueNode * node,
1002            afs_uint32 new_prio)
1003 {
1004     afs_uint32 delta;
1005
1006     switch (node->state) {
1007     case SALVSYNC_STATE_ERROR:
1008     case SALVSYNC_STATE_DONE:
1009     case SALVSYNC_STATE_UNKNOWN:
1010         node->command.sop.prio = 0;
1011         break;
1012     }
1013
1014     if (new_prio < clone->command.sop.prio) {
1015         /* strange. let's just set our delta to 1 */
1016         delta = 1;
1017     } else {
1018         delta = new_prio - clone->command.sop.prio;
1019     }
1020
1021     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1022         clone->command.sop.prio = new_prio;
1023     }
1024
1025     node->command.sop.prio += delta;
1026 }
1027
1028 static int
1029 AddToSalvageQueue(struct SalvageQueueNode * node)
1030 {
1031     afs_int32 id;
1032     struct SalvageQueueNode * last = NULL;
1033
1034     id = volutil_GetPartitionID(node->command.sop.partName);
1035     if (id < 0 || id > VOLMAXPARTS) {
1036         return 1;
1037     }
1038     if (!VGetPartitionById_r(id, 0)) {
1039         /* don't enqueue salvage requests for unmounted partitions */
1040         return 1;
1041     }
1042     if (queue_IsOnQueue(node)) {
1043         return 0;
1044     }
1045
1046     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1047         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1048     }
1049     queue_Append(&salvageQueue.part[id], node);
1050     salvageQueue.len[id]++;
1051     salvageQueue.total_len++;
1052     salvageQueue.last_insert = id;
1053     node->partition_id = id;
1054     node->state = SALVSYNC_STATE_QUEUED;
1055
1056     /* reorder, if necessary */
1057     if (last && last->command.sop.prio < node->command.sop.prio) {
1058         UpdateCommandPrio(node);
1059     }
1060
1061     assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1062     return 0;
1063 }
1064
1065 static void
1066 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1067 {
1068     if (queue_IsOnQueue(node)) {
1069         queue_Remove(node);
1070         salvageQueue.len[node->partition_id]--;
1071         salvageQueue.total_len--;
1072         node->state = SALVSYNC_STATE_UNKNOWN;
1073         assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1074     }
1075 }
1076
1077 static void
1078 AddToPendingQueue(struct SalvageQueueNode * node)
1079 {
1080     queue_Append(&pendingQueue, node);
1081     pendingQueue.len++;
1082     node->state = SALVSYNC_STATE_SALVAGING;
1083     assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1084 }
1085
1086 static void
1087 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1088 {
1089     if (queue_IsOnQueue(node)) {
1090         queue_Remove(node);
1091         pendingQueue.len--;
1092         node->state = SALVSYNC_STATE_UNKNOWN;
1093         assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1094     }
1095 }
1096
1097 static struct SalvageQueueNode *
1098 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1099 {
1100     struct SalvageQueueNode * np, * nnp;
1101
1102     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1103         if ((np->command.sop.volume == qry->volume) && 
1104             !strncmp(np->command.sop.partName, qry->partName,
1105                      sizeof(qry->partName)))
1106             break;
1107     }
1108
1109     if (queue_IsEnd(&pendingQueue, np))
1110         np = NULL;
1111     return np;
1112 }
1113
1114 static struct SalvageQueueNode *
1115 LookupPendingCommandByPid(int pid)
1116 {
1117     struct SalvageQueueNode * np, * nnp;
1118
1119     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1120         if (np->pid == pid)
1121             break;
1122     }
1123
1124     if (queue_IsEnd(&pendingQueue, np))
1125         np = NULL;
1126     return np;
1127 }
1128
1129
1130 /* raise the priority of a previously scheduled salvage */
1131 static void
1132 UpdateCommandPrio(struct SalvageQueueNode * node)
1133 {
1134     struct SalvageQueueNode *np, *nnp;
1135     afs_int32 id;
1136     afs_uint32 prio;
1137
1138     assert(queue_IsOnQueue(node));
1139
1140     prio = node->command.sop.prio;
1141     id = node->partition_id;
1142     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1143         queue_Remove(node);
1144         queue_Prepend(&salvageQueue.part[id], node);
1145     } else {
1146         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1147             if (np->command.sop.prio > prio)
1148                 break;
1149         }
1150         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1151             queue_Remove(node);
1152             queue_Prepend(&salvageQueue.part[id], node);
1153         } else if (node != np) {
1154             queue_Remove(node);
1155             queue_InsertAfter(np, node);
1156         }
1157     }
1158 }
1159
1160 /* this will need to be rearchitected if we ever want more than one thread
1161  * to wait for new salvage nodes */
1162 struct SalvageQueueNode * 
1163 SALVSYNC_getWork(void)
1164 {
1165     int i, ret;
1166     struct DiskPartition64 * dp = NULL, * fdp;
1167     static afs_int32 next_part_sched = 0;
1168     struct SalvageQueueNode *node = NULL, *np;
1169
1170     VOL_LOCK;
1171
1172     /*
1173      * wait for work to be scheduled
1174      * if there are no disk partitions, just sit in this wait loop forever
1175      */
1176     while (!salvageQueue.total_len || !DiskPartitionList) {
1177         VOL_CV_WAIT(&salvageQueue.cv);
1178     }
1179
1180     /* 
1181      * short circuit for simple case where only one partition has
1182      * scheduled salvages
1183      */
1184     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1185         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1186         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1187         goto have_node;
1188     }
1189
1190
1191     /* 
1192      * ok, more than one partition has scheduled salvages.
1193      * now search for partitions with scheduled salvages, but no pending salvages. 
1194      */
1195     dp = VGetPartitionById_r(next_part_sched, 0);
1196     if (!dp) {
1197         dp = DiskPartitionList;
1198     }
1199     fdp = dp;
1200
1201     for (i=0 ; 
1202          !i || dp != fdp ; 
1203          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1204         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1205             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1206             goto have_node;
1207         }
1208     }
1209
1210
1211     /*
1212      * all partitions with scheduled salvages have at least one pending.
1213      * now do an exhaustive search for a scheduled salvage.
1214      */
1215     dp = fdp;
1216
1217     for (i=0 ; 
1218          !i || dp != fdp ; 
1219          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1220         if (salvageQueue.len[dp->index]) {
1221             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1222             goto have_node;
1223         }
1224     }
1225
1226     /* we should never reach this line */
1227     assert(1==2);
1228
1229  have_node:
1230     assert(node != NULL);
1231     node->pid = 0;
1232     partition_salvaging[node->partition_id]++;
1233     DeleteFromSalvageQueue(node);
1234     AddToPendingQueue(node);
1235
1236     if (dp) {
1237         /* update next_part_sched field */
1238         if (dp->next) {
1239             next_part_sched = dp->next->index;
1240         } else if (DiskPartitionList) {
1241             next_part_sched = DiskPartitionList->index;
1242         } else {
1243             next_part_sched = -1;
1244         }
1245     }
1246
1247  bail:
1248     VOL_UNLOCK;
1249     return node;
1250 }
1251
1252 /**
1253  * update internal scheduler state to reflect completion of a work unit.
1254  *
1255  * @param[in]  node    salvage queue node object pointer
1256  * @param[in]  result  worker process result code
1257  *
1258  * @post scheduler state is updated.
1259  *
1260  * @internal
1261  */
1262 static void
1263 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1264 {
1265     afs_int32 partid;
1266     int idx;
1267
1268     DeleteFromPendingQueue(node);
1269     partid = node->partition_id;
1270     if (partid >=0 && partid <= VOLMAXPARTS) {
1271         partition_salvaging[partid]--;
1272     }
1273     if (result == 0) {
1274         node->state = SALVSYNC_STATE_DONE;
1275     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1276         node->state = SALVSYNC_STATE_ERROR;
1277     }
1278
1279     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1280         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1281             if (node->volgroup.children[idx]) {
1282                 node->volgroup.children[idx]->state = node->state;
1283             }
1284         }
1285     }
1286 }
1287
1288 /**
1289  * check whether worker child failed.
1290  *
1291  * @param[in] status  status bitfield return by wait()
1292  *
1293  * @return boolean failure code
1294  *    @retval 0 child succeeded
1295  *    @retval 1 child failed
1296  *
1297  * @internal
1298  */
1299 static int
1300 ChildFailed(int status)
1301 {
1302     return (WCOREDUMP(status) || 
1303             WIFSIGNALED(status) || 
1304             ((WEXITSTATUS(status) != 0) && 
1305              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1306 }
1307
1308
1309 /**
1310  * notify salvsync scheduler of node completion, by child pid.
1311  *
1312  * @param[in]  pid     pid of worker child
1313  * @param[in]  status  worker status bitfield from wait()
1314  *
1315  * @post scheduler state is updated.
1316  *       if status code is a failure, fileserver notification was attempted
1317  *
1318  * @see SALVSYNC_doneWork_r
1319  */
1320 void
1321 SALVSYNC_doneWorkByPid(int pid, int status)
1322 {
1323     struct SalvageQueueNode * node;
1324     char partName[16];
1325     afs_uint32 volids[VOLMAXTYPES+1];
1326     unsigned int idx;
1327
1328     memset(volids, 0, sizeof(volids));
1329
1330     VOL_LOCK;
1331     node = LookupPendingCommandByPid(pid);
1332     if (node != NULL) {
1333         SALVSYNC_doneWork_r(node, status);
1334
1335         if (ChildFailed(status)) {
1336             /* populate volume id list for later processing outside the glock */
1337             volids[0] = node->command.sop.volume;
1338             strcpy(partName, node->command.sop.partName);
1339             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1340                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1341                     if (node->volgroup.children[idx]) {
1342                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1343                     }
1344                 }
1345             }
1346         }
1347     }
1348     VOL_UNLOCK;
1349
1350     /*
1351      * if necessary, notify fileserver of
1352      * failure to salvage volume group
1353      * [we cannot guarantee that the child made the
1354      *  appropriate notifications (e.g. SIGSEGV)]
1355      *  -- tkeiser 11/28/2007
1356      */
1357     if (ChildFailed(status)) {
1358         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1359             if (volids[idx]) {
1360                 FSYNC_VolOp(volids[idx],
1361                             partName,
1362                             FSYNC_VOL_FORCE_ERROR,
1363                             FSYNC_WHATEVER,
1364                             NULL);
1365             }
1366         }
1367     }
1368 }
1369
1370 #endif /* AFS_DEMAND_ATTACH_FS */