Demand attach warning fixes
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29
30 #include <sys/types.h>
31 #include <stdio.h>
32 #ifdef AFS_NT40_ENV
33 #include <winsock2.h>
34 #include <time.h>
35 #else
36 #include <sys/param.h>
37 #include <sys/socket.h>
38 #include <netinet/in.h>
39 #include <netdb.h>
40 #include <sys/time.h>
41 #endif
42 #include <errno.h>
43 #include <assert.h>
44 #include <signal.h>
45 #include <string.h>
46
47
48 #include <rx/xdr.h>
49 #include <afs/afsint.h>
50 #include "nfs.h"
51 #include <afs/errors.h>
52 #include "salvsync.h"
53 #include "lwp.h"
54 #include "lock.h"
55 #include <afs/afssyscalls.h>
56 #include "ihandle.h"
57 #include "vnode.h"
58 #include "volume.h"
59 #include "partition.h"
60 #include <rx/rx_queue.h>
61 #include <afs/procmgmt.h>
62
63 #if !defined(offsetof)
64 #include <stddef.h>
65 #endif
66
67 #ifdef USE_UNIX_SOCKETS
68 #include <afs/afsutil.h>
69 #include <sys/un.h>
70 #endif
71
72
73 /*@printflike@*/ extern void Log(const char *format, ...);
74
75 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
76                                  * move = dump+restore can run on single server */
77
78
79 /*
80  * This lock controls access to the handler array.
81  */
82 struct Lock SALVSYNC_handler_lock;
83
84
85 #ifdef AFS_DEMAND_ATTACH_FS
86 /*
87  * SALVSYNC is a feature specific to the demand attach fileserver
88  */
89
90 /* Forward declarations */
91 static void * SALVSYNC_syncThread(void *);
92 static void SALVSYNC_newconnection(osi_socket fd);
93 static void SALVSYNC_com(osi_socket fd);
94 static void SALVSYNC_Drop(osi_socket fd);
95 static void AcceptOn(void);
96 static void AcceptOff(void);
97 static void InitHandler(void);
98 static void CallHandler(fd_set * fdsetp);
99 static int AddHandler(osi_socket afd, void (*aproc) (int));
100 static int FindHandler(register osi_socket afd);
101 static int FindHandler_r(register osi_socket afd);
102 static int RemoveHandler(register osi_socket afd);
103 static void GetHandler(fd_set * fdsetp, int *maxfdp);
104
105 static int AllocNode(struct SalvageQueueNode ** node);
106
107 static int AddToSalvageQueue(struct SalvageQueueNode * node);
108 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
109 static void AddToPendingQueue(struct SalvageQueueNode * node);
110 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
111 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
112 static void UpdateCommandPrio(struct SalvageQueueNode * node);
113 static void HandlePrio(struct SalvageQueueNode * clone, 
114                        struct SalvageQueueNode * parent,
115                        afs_uint32 new_prio);
116
117 static int LinkNode(struct SalvageQueueNode * parent,
118                     struct SalvageQueueNode * clone);
119
120 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName, 
121                                             struct SalvageQueueNode ** parent);
122 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
123                                                      struct SalvageQueueNode ** parent);
124 static void AddNodeToHash(struct SalvageQueueNode * node);
125
126 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
127 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
128 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
129 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
130 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
131
132
133 extern int LogLevel;
134 extern int VInit;
135 extern pthread_mutex_t vol_salvsync_mutex;
136
137 /**
138  * salvsync server socket handle.
139  */
140 static SYNC_server_state_t salvsync_server_state = 
141     { -1,                       /* file descriptor */
142       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
143       SALVSYNC_PROTO_VERSION,   /* protocol version */
144       5,                        /* bind() retry limit */
145       100,                      /* listen() queue depth */
146       "SALVSYNC",               /* protocol name string */
147     };
148
149
150 /**
151  * queue of all volumes waiting to be salvaged.
152  */
153 struct SalvageQueue {
154     volatile int total_len;
155     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
156     volatile int len[VOLMAXPARTS+1];
157     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
158     pthread_cond_t cv;
159 };
160 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
161
162 /**
163  * queue of all volumes currently being salvaged.
164  */
165 struct QueueHead {
166     volatile struct rx_queue q;  /**< queue of salvages in progress */
167     volatile int len;            /**< length of in-progress queue */
168     pthread_cond_t queue_change_cv;
169 };
170 static struct QueueHead pendingQueue;  /* volumes being salvaged */
171
172 /* XXX
173  * whether a partition has a salvage in progress
174  *
175  * the salvager code only permits one salvage per partition at a time
176  *
177  * the following hack tries to keep salvaged parallelism high by
178  * only permitting one salvage dispatch per partition at a time
179  *
180  * unfortunately, the parallel salvager currently
181  * has a rather braindead routine that won't permit
182  * multiple salvages on the same "device".  this
183  * function happens to break pretty badly on lvm, raid luns, etc.
184  *
185  * this hack isn't good enough to stop the device limiting code from
186  * crippling performance.  someday that code needs to be rewritten
187  */
188 static int partition_salvaging[VOLMAXPARTS+1];
189
190 static int HandlerFD[MAXHANDLERS];
191 static void (*HandlerProc[MAXHANDLERS]) (int);
192
193 #define VSHASH_SIZE 64
194 #define VSHASH_MASK (VSHASH_SIZE-1)
195 #define VSHASH(vid) ((vid)&VSHASH_MASK)
196
197 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
198
199 static struct SalvageQueueNode *
200 LookupNode(afs_uint32 vid, char * partName,
201            struct SalvageQueueNode ** parent)
202 {
203     struct rx_queue *qp, *nqp;
204     struct SalvageQueueNode *vsp;
205     int idx = VSHASH(vid);
206
207     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
208         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
209         if ((vsp->command.sop.volume == vid) &&
210             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
211             break;
212         }
213     }
214
215     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
216         vsp = NULL;
217     }
218
219     if (parent) {
220         if (vsp) {
221             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
222                 vsp->volgroup.parent : vsp;
223         } else {
224             *parent = NULL;
225         }
226     }
227
228     return vsp;
229 }
230
231 static struct SalvageQueueNode *
232 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
233                     struct SalvageQueueNode ** parent)
234 {
235     return LookupNode(qry->volume, qry->partName, parent);
236 }
237
238 static void
239 AddNodeToHash(struct SalvageQueueNode * node)
240 {
241     int idx = VSHASH(node->command.sop.volume);
242
243     if (queue_IsOnQueue(&node->hash_chain)) {
244         return;
245     }
246
247     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
248     SalvageHashTable[idx].len++;
249 }
250
251 #if 0
252 static void
253 DeleteNodeFromHash(struct SalvageQueueNode * node)
254 {
255     int idx = VSHASH(node->command.sop.volume);
256
257     if (queue_IsNotOnQueue(&node->hash_chain)) {
258         return;
259     }
260
261     queue_Remove(&node->hash_chain);
262     SalvageHashTable[idx].len--;
263 }
264 #endif
265
266 void
267 SALVSYNC_salvInit(void)
268 {
269     int i;
270     pthread_t tid;
271     pthread_attr_t tattr;
272
273     /* initialize the queues */
274     Lock_Init(&SALVSYNC_handler_lock);
275     assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
276     for (i = 0; i <= VOLMAXPARTS; i++) {
277         queue_Init(&salvageQueue.part[i]);
278         salvageQueue.len[i] = 0;
279     }
280     assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
281     queue_Init(&pendingQueue);
282     salvageQueue.total_len = pendingQueue.len = 0;
283     salvageQueue.last_insert = -1;
284     memset(partition_salvaging, 0, sizeof(partition_salvaging));
285
286     for (i = 0; i < VSHASH_SIZE; i++) {
287         assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
288         SalvageHashTable[i].len = 0;
289         queue_Init(&SalvageHashTable[i]);
290     }
291
292     /* start the salvsync thread */
293     assert(pthread_attr_init(&tattr) == 0);
294     assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
295     assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
296 }
297
298 static void
299 CleanFDs(void)
300 {
301     int i;
302     for (i = 0; i < MAXHANDLERS; ++i) {
303         if (HandlerFD[i] >= 0) {
304             SALVSYNC_Drop(HandlerFD[i]);
305         }
306     }
307
308     /* just in case we were in AcceptOff mode, and thus this fd wouldn't
309      * have a handler */
310     close(salvsync_server_state.fd);
311     salvsync_server_state.fd = -1;
312 }
313
314 static fd_set SALVSYNC_readfds;
315
316 static void *
317 SALVSYNC_syncThread(void * args)
318 {
319     int code;
320     SYNC_server_state_t * state = &salvsync_server_state;
321
322     /* when we fork, the child needs to close the salvsync server sockets,
323      * otherwise, it may get salvsync requests, instead of the parent
324      * salvageserver */
325     assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
326
327     SYNC_getAddr(&state->endpoint, &state->addr);
328     SYNC_cleanupSock(state);
329
330 #ifndef AFS_NT40_ENV
331     (void)signal(SIGPIPE, SIG_IGN);
332 #endif
333
334     state->fd = SYNC_getSock(&state->endpoint);
335     code = SYNC_bindSock(state);
336     assert(!code);
337
338     InitHandler();
339     AcceptOn();
340
341     for (;;) {
342         int maxfd;
343         GetHandler(&SALVSYNC_readfds, &maxfd);
344         /* Note: check for >= 1 below is essential since IOMGR_select
345          * doesn't have exactly same semantics as select.
346          */
347         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
348             CallHandler(&SALVSYNC_readfds);
349     }
350
351     return NULL;
352 }
353
354 static void
355 SALVSYNC_newconnection(int afd)
356 {
357 #ifdef USE_UNIX_SOCKETS
358     struct sockaddr_un other;
359 #else  /* USE_UNIX_SOCKETS */
360     struct sockaddr_in other;
361 #endif
362     int fd;
363     socklen_t junk;
364
365     junk = sizeof(other);
366     fd = accept(afd, (struct sockaddr *)&other, &junk);
367     if (fd == -1) {
368         Log("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
369         assert(1 == 2);
370     } else if (!AddHandler(fd, SALVSYNC_com)) {
371         AcceptOff();
372         assert(AddHandler(fd, SALVSYNC_com));
373     }
374 }
375
376 /* this function processes commands from an salvsync file descriptor (fd) */
377 static afs_int32 SALV_cnt = 0;
378 static void
379 SALVSYNC_com(osi_socket fd)
380 {
381     SYNC_command com;
382     SYNC_response res;
383     SALVSYNC_response_hdr sres_hdr;
384     SALVSYNC_command scom;
385     SALVSYNC_response sres;
386     SYNC_PROTO_BUF_DECL(buf);
387
388     memset(&com, 0, sizeof(com));
389     memset(&res, 0, sizeof(res));
390     memset(&scom, 0, sizeof(scom));
391     memset(&sres, 0, sizeof(sres));
392     memset(&sres_hdr, 0, sizeof(sres_hdr));
393     
394     com.payload.buf = (void *)buf;
395     com.payload.len = SYNC_PROTO_MAX_LEN;
396     res.payload.buf = (void *) &sres_hdr;
397     res.payload.len = sizeof(sres_hdr);
398     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
399     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
400
401     scom.hdr = &com.hdr;
402     scom.sop = (SALVSYNC_command_hdr *) buf;
403     scom.com = &com;
404     sres.hdr = &res.hdr;
405     sres.sop = &sres_hdr;
406     sres.res = &res;
407
408     SALV_cnt++;
409     if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
410         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
411         SALVSYNC_Drop(fd);
412         return;
413     }
414
415     if (com.recv_len < sizeof(com.hdr)) {
416         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
417         res.hdr.response = SYNC_COM_ERROR;
418         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
419         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
420         goto respond;
421     }
422
423     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
424         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
425         res.hdr.response = SYNC_COM_ERROR;
426         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
427         goto respond;
428     }
429
430     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
431         res.hdr.response = SYNC_OK;
432         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
433
434         /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
435          * never wait for a response. */
436         goto done;
437     }
438
439     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
440         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
441         res.hdr.response = SYNC_COM_ERROR;
442         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
443         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
444         goto respond;
445     }
446
447     res.hdr.com_seq = com.hdr.com_seq;
448
449     VOL_LOCK;
450     switch (com.hdr.command) {
451     case SALVSYNC_NOP:
452         break;
453     case SALVSYNC_SALVAGE:
454     case SALVSYNC_RAISEPRIO:
455         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
456         break;
457     case SALVSYNC_CANCEL:
458         /* cancel a salvage */
459         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
460         break;
461     case SALVSYNC_CANCELALL:
462         /* cancel all queued salvages */
463         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
464         break;
465     case SALVSYNC_QUERY:
466         /* query whether a volume is done salvaging */
467         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
468         break;
469     case SALVSYNC_OP_LINK:
470         /* link a clone to its parent in the scheduler */
471         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
472         break;
473     default:
474         res.hdr.response = SYNC_BAD_COMMAND;
475         break;
476     }
477
478     sres_hdr.sq_len = salvageQueue.total_len;
479     sres_hdr.pq_len = pendingQueue.len;
480     VOL_UNLOCK;
481
482  respond:
483     SYNC_putRes(&salvsync_server_state, fd, &res);
484
485  done:
486     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
487         SALVSYNC_Drop(fd);
488     }
489 }
490
491 /**
492  * request that a volume be salvaged.
493  *
494  * @param[in]  com  inbound command object
495  * @param[out] res  outbound response object
496  *
497  * @return operation status
498  *    @retval SYNC_OK success
499  *    @retval SYNC_DENIED failed to enqueue request
500  *    @retval SYNC_FAILED malformed command packet
501  *
502  * @note this is a SALVSYNC protocol rpc handler
503  *
504  * @internal
505  *
506  * @post the volume is enqueued in the to-be-salvaged queue.  
507  *       if the volume was already in the salvage queue, its 
508  *       priority (and thus its location in the queue) are 
509  *       updated.
510  */
511 static afs_int32
512 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
513 {
514     afs_int32 code = SYNC_OK;
515     struct SalvageQueueNode * node, * clone;
516     int hash = 0;
517
518     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
519         code = SYNC_FAILED;
520         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
521         goto done;
522     }
523
524     clone = LookupNodeByCommand(com->sop, &node);
525
526     if (node == NULL) {
527         if (AllocNode(&node)) {
528             code = SYNC_DENIED;
529             res->hdr->reason = SYNC_REASON_NOMEM;
530             goto done;
531         }
532         clone = node;
533         hash = 1;
534     }
535
536     HandlePrio(clone, node, com->sop->prio);
537
538     switch (node->state) {
539     case SALVSYNC_STATE_QUEUED:
540         UpdateCommandPrio(node);
541         break;
542
543     case SALVSYNC_STATE_ERROR:
544     case SALVSYNC_STATE_DONE:
545     case SALVSYNC_STATE_UNKNOWN:
546         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
547         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
548
549         /* 
550          * make sure volgroup parent partition path is kept coherent
551          *
552          * If we ever want to support non-COW clones on a machine holding
553          * the RW site, please note that this code does not work under the
554          * conditions where someone zaps a COW clone on partition X, and
555          * subsequently creates a full clone on partition Y -- we'd need
556          * an inverse to SALVSYNC_com_Link.
557          *  -- tkeiser 11/28/2007
558          */
559         strcpy(node->command.sop.partName, com->sop->partName);
560
561         if (AddToSalvageQueue(node)) {
562             code = SYNC_DENIED;
563         }
564         break;
565
566     default:
567         break;
568     }
569
570     if (hash) {
571         AddNodeToHash(node);
572     }
573
574     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
575     res->sop->state = node->state;
576     res->sop->prio = node->command.sop.prio;
577
578  done:
579     return code;
580 }
581
582 /**
583  * cancel a pending salvage request.
584  *
585  * @param[in]  com  inbound command object
586  * @param[out] res  outbound response object
587  *
588  * @return operation status
589  *    @retval SYNC_OK success
590  *    @retval SYNC_FAILED malformed command packet
591  *
592  * @note this is a SALVSYNC protocol rpc handler
593  *
594  * @internal
595  */
596 static afs_int32
597 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
598 {
599     afs_int32 code = SYNC_OK;
600     struct SalvageQueueNode * node;
601
602     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
603         code = SYNC_FAILED;
604         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
605         goto done;
606     }
607
608     node = LookupNodeByCommand(com->sop, NULL);
609
610     if (node == NULL) {
611         res->sop->state = SALVSYNC_STATE_UNKNOWN;
612         res->sop->prio = 0;
613     } else {
614         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
615         res->sop->prio = node->command.sop.prio;
616         res->sop->state = node->state;
617         if ((node->type == SALVSYNC_VOLGROUP_PARENT) && 
618             (node->state == SALVSYNC_STATE_QUEUED)) {
619             DeleteFromSalvageQueue(node);
620         }
621     }
622
623  done:
624     return code;
625 }
626
627 /**
628  * cancel all pending salvage requests.
629  *
630  * @param[in]  com  incoming command object
631  * @param[out] res  outbound response object
632  *
633  * @return operation status
634  *    @retval SYNC_OK success
635  *
636  * @note this is a SALVSYNC protocol rpc handler
637  *
638  * @internal
639  */
640 static afs_int32
641 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
642 {
643     struct SalvageQueueNode * np, *nnp;
644     struct DiskPartition64 * dp;
645
646     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
647         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
648             DeleteFromSalvageQueue(np);
649         }
650     }
651
652     return SYNC_OK;
653 }
654
655 /**
656  * link a queue node for a clone to its parent volume.
657  *
658  * @param[in]  com   inbound command object
659  * @param[out] res   outbound response object
660  *
661  * @return operation status
662  *    @retval SYNC_OK success
663  *    @retval SYNC_FAILED malformed command packet
664  *    @retval SYNC_DENIED the request could not be completed
665  *
666  * @note this is a SALVSYNC protocol rpc handler
667  *
668  * @post the requested volume is marked as a child of another volume.
669  *       thus, future salvage requests for this volume will result in the
670  *       parent of the volume group being scheduled for salvage instead
671  *       of this clone.
672  *
673  * @internal
674  */
675 static afs_int32
676 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
677 {
678     afs_int32 code = SYNC_OK;
679     struct SalvageQueueNode * clone, * parent;
680
681     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
682         code = SYNC_FAILED;
683         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
684         goto done;
685     }
686
687     /* lookup clone's salvage scheduling node */
688     clone = LookupNodeByCommand(com->sop, NULL);
689     if (clone == NULL) {
690         code = SYNC_DENIED;
691         res->hdr->reason = SALVSYNC_REASON_ERROR;
692         goto done;
693     }
694
695     /* lookup parent's salvage scheduling node */
696     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
697     if (parent == NULL) {
698         if (AllocNode(&parent)) {
699             code = SYNC_DENIED;
700             res->hdr->reason = SYNC_REASON_NOMEM;
701             goto done;
702         }
703         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
704         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
705         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
706         AddNodeToHash(parent);
707     }
708
709     if (LinkNode(parent, clone)) {
710         code = SYNC_DENIED;
711         goto done;
712     }
713
714  done:
715     return code;
716 }
717
718 /**
719  * query the status of a volume salvage request.
720  *
721  * @param[in]  com   inbound command object
722  * @param[out] res   outbound response object
723  *
724  * @return operation status
725  *    @retval SYNC_OK success
726  *    @retval SYNC_FAILED malformed command packet
727  *
728  * @note this is a SALVSYNC protocol rpc handler
729  *
730  * @internal
731  */
732 static afs_int32
733 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
734 {
735     afs_int32 code = SYNC_OK;
736     struct SalvageQueueNode * node;
737
738     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
739         code = SYNC_FAILED;
740         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
741         goto done;
742     }
743
744     LookupNodeByCommand(com->sop, &node);
745
746     /* query whether a volume is done salvaging */
747     if (node == NULL) {
748         res->sop->state = SALVSYNC_STATE_UNKNOWN;
749         res->sop->prio = 0;
750     } else {
751         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
752         res->sop->state = node->state;
753         res->sop->prio = node->command.sop.prio;
754     }
755
756  done:
757     return code;
758 }
759
760 static void
761 SALVSYNC_Drop(osi_socket fd)
762 {
763     RemoveHandler(fd);
764 #ifdef AFS_NT40_ENV
765     closesocket(fd);
766 #else
767     close(fd);
768 #endif
769     AcceptOn();
770 }
771
772 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
773
774 static void
775 AcceptOn(void)
776 {
777     if (AcceptHandler == -1) {
778         assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
779         AcceptHandler = FindHandler(salvsync_server_state.fd);
780     }
781 }
782
783 static void
784 AcceptOff(void)
785 {
786     if (AcceptHandler != -1) {
787         assert(RemoveHandler(salvsync_server_state.fd));
788         AcceptHandler = -1;
789     }
790 }
791
792 /* The multiple FD handling code. */
793
794 static void
795 InitHandler(void)
796 {
797     register int i;
798     ObtainWriteLock(&SALVSYNC_handler_lock);
799     for (i = 0; i < MAXHANDLERS; i++) {
800         HandlerFD[i] = -1;
801         HandlerProc[i] = NULL;
802     }
803     ReleaseWriteLock(&SALVSYNC_handler_lock);
804 }
805
806 static void
807 CallHandler(fd_set * fdsetp)
808 {
809     register int i;
810     ObtainReadLock(&SALVSYNC_handler_lock);
811     for (i = 0; i < MAXHANDLERS; i++) {
812         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
813             ReleaseReadLock(&SALVSYNC_handler_lock);
814             (*HandlerProc[i]) (HandlerFD[i]);
815             ObtainReadLock(&SALVSYNC_handler_lock);
816         }
817     }
818     ReleaseReadLock(&SALVSYNC_handler_lock);
819 }
820
821 static int
822 AddHandler(osi_socket afd, void (*aproc) (int))
823 {
824     register int i;
825     ObtainWriteLock(&SALVSYNC_handler_lock);
826     for (i = 0; i < MAXHANDLERS; i++)
827         if (HandlerFD[i] == -1)
828             break;
829     if (i >= MAXHANDLERS) {
830         ReleaseWriteLock(&SALVSYNC_handler_lock);
831         return 0;
832     }
833     HandlerFD[i] = afd;
834     HandlerProc[i] = aproc;
835     ReleaseWriteLock(&SALVSYNC_handler_lock);
836     return 1;
837 }
838
839 static int
840 FindHandler(register osi_socket afd)
841 {
842     register int i;
843     ObtainReadLock(&SALVSYNC_handler_lock);
844     for (i = 0; i < MAXHANDLERS; i++)
845         if (HandlerFD[i] == afd) {
846             ReleaseReadLock(&SALVSYNC_handler_lock);
847             return i;
848         }
849     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
850     assert(1 == 2);
851     return -1;                  /* satisfy compiler */
852 }
853
854 static int
855 FindHandler_r(register osi_socket afd)
856 {
857     register int i;
858     for (i = 0; i < MAXHANDLERS; i++)
859         if (HandlerFD[i] == afd) {
860             return i;
861         }
862     assert(1 == 2);
863     return -1;                  /* satisfy compiler */
864 }
865
866 static int
867 RemoveHandler(register osi_socket afd)
868 {
869     ObtainWriteLock(&SALVSYNC_handler_lock);
870     HandlerFD[FindHandler_r(afd)] = -1;
871     ReleaseWriteLock(&SALVSYNC_handler_lock);
872     return 1;
873 }
874
875 static void
876 GetHandler(fd_set * fdsetp, int *maxfdp)
877 {
878     register int i;
879     register int maxfd = -1;
880     FD_ZERO(fdsetp);
881     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
882     for (i = 0; i < MAXHANDLERS; i++)
883         if (HandlerFD[i] != -1) {
884             FD_SET(HandlerFD[i], fdsetp);
885             if (maxfd < HandlerFD[i])
886                 maxfd = HandlerFD[i];
887         }
888     *maxfdp = maxfd;
889     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
890 }
891
892 /**
893  * allocate a salvage queue node.
894  *
895  * @param[out] node_out  address in which to store new node pointer
896  *
897  * @return operation status
898  *    @retval 0 success
899  *    @retval 1 failed to allocate node
900  *
901  * @internal
902  */
903 static int
904 AllocNode(struct SalvageQueueNode ** node_out)
905 {
906     int code = 0;
907     struct SalvageQueueNode * node;
908
909     *node_out = node = (struct SalvageQueueNode *) 
910         malloc(sizeof(struct SalvageQueueNode));
911     if (node == NULL) {
912         code = 1;
913         goto done;
914     }
915
916     memset(node, 0, sizeof(struct SalvageQueueNode));
917     node->type = SALVSYNC_VOLGROUP_PARENT;
918     node->state = SALVSYNC_STATE_UNKNOWN;
919
920  done:
921     return code;
922 }
923
924 /**
925  * link a salvage queue node to its parent.
926  *
927  * @param[in] parent  pointer to queue node for parent of volume group
928  * @param[in] clone   pointer to queue node for a clone
929  *
930  * @return operation status
931  *    @retval 0 success
932  *    @retval 1 failure
933  *
934  * @internal
935  */
936 static int
937 LinkNode(struct SalvageQueueNode * parent,
938          struct SalvageQueueNode * clone)
939 {
940     int code = 0;
941     int idx;
942
943     /* check for attaching a clone to a clone */
944     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
945         code = 1;
946         goto done;
947     }
948
949     /* check for pre-existing registration and openings */
950     for (idx = 0; idx < VOLMAXTYPES; idx++) {
951         if (parent->volgroup.children[idx] == clone) {
952             goto linked;
953         }
954         if (parent->volgroup.children[idx] == NULL) {
955             break;
956         }
957     }
958     if (idx == VOLMAXTYPES) {
959         code = 1;
960         goto done;
961     }
962
963     /* link parent and child */
964     parent->volgroup.children[idx] = clone;
965     clone->type = SALVSYNC_VOLGROUP_CLONE;
966     clone->volgroup.parent = parent;
967
968
969  linked:
970     switch (clone->state) {
971     case SALVSYNC_STATE_QUEUED:
972         DeleteFromSalvageQueue(clone);
973
974     case SALVSYNC_STATE_SALVAGING:
975         switch (parent->state) {
976         case SALVSYNC_STATE_UNKNOWN:
977         case SALVSYNC_STATE_ERROR:
978         case SALVSYNC_STATE_DONE:
979             parent->command.sop.prio = clone->command.sop.prio;
980             AddToSalvageQueue(parent);
981             break;
982
983         case SALVSYNC_STATE_QUEUED:
984             if (clone->command.sop.prio) {
985                 parent->command.sop.prio += clone->command.sop.prio;
986                 UpdateCommandPrio(parent);
987             }
988             break;
989
990         default:
991             break;
992         }
993         break;
994
995     default:
996         break;
997     }
998
999  done:
1000     return code;
1001 }
1002
1003 static void
1004 HandlePrio(struct SalvageQueueNode * clone, 
1005            struct SalvageQueueNode * node,
1006            afs_uint32 new_prio)
1007 {
1008     afs_uint32 delta;
1009
1010     switch (node->state) {
1011     case SALVSYNC_STATE_ERROR:
1012     case SALVSYNC_STATE_DONE:
1013     case SALVSYNC_STATE_UNKNOWN:
1014         node->command.sop.prio = 0;
1015         break;
1016     default:
1017         break;
1018     }
1019
1020     if (new_prio < clone->command.sop.prio) {
1021         /* strange. let's just set our delta to 1 */
1022         delta = 1;
1023     } else {
1024         delta = new_prio - clone->command.sop.prio;
1025     }
1026
1027     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1028         clone->command.sop.prio = new_prio;
1029     }
1030
1031     node->command.sop.prio += delta;
1032 }
1033
1034 static int
1035 AddToSalvageQueue(struct SalvageQueueNode * node)
1036 {
1037     afs_int32 id;
1038     struct SalvageQueueNode * last = NULL;
1039
1040     id = volutil_GetPartitionID(node->command.sop.partName);
1041     if (id < 0 || id > VOLMAXPARTS) {
1042         return 1;
1043     }
1044     if (!VGetPartitionById_r(id, 0)) {
1045         /* don't enqueue salvage requests for unmounted partitions */
1046         return 1;
1047     }
1048     if (queue_IsOnQueue(node)) {
1049         return 0;
1050     }
1051
1052     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1053         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1054     }
1055     queue_Append(&salvageQueue.part[id], node);
1056     salvageQueue.len[id]++;
1057     salvageQueue.total_len++;
1058     salvageQueue.last_insert = id;
1059     node->partition_id = id;
1060     node->state = SALVSYNC_STATE_QUEUED;
1061
1062     /* reorder, if necessary */
1063     if (last && last->command.sop.prio < node->command.sop.prio) {
1064         UpdateCommandPrio(node);
1065     }
1066
1067     assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1068     return 0;
1069 }
1070
1071 static void
1072 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1073 {
1074     if (queue_IsOnQueue(node)) {
1075         queue_Remove(node);
1076         salvageQueue.len[node->partition_id]--;
1077         salvageQueue.total_len--;
1078         node->state = SALVSYNC_STATE_UNKNOWN;
1079         assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1080     }
1081 }
1082
1083 static void
1084 AddToPendingQueue(struct SalvageQueueNode * node)
1085 {
1086     queue_Append(&pendingQueue, node);
1087     pendingQueue.len++;
1088     node->state = SALVSYNC_STATE_SALVAGING;
1089     assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1090 }
1091
1092 static void
1093 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1094 {
1095     if (queue_IsOnQueue(node)) {
1096         queue_Remove(node);
1097         pendingQueue.len--;
1098         node->state = SALVSYNC_STATE_UNKNOWN;
1099         assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1100     }
1101 }
1102
1103 #if 0
1104 static struct SalvageQueueNode *
1105 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1106 {
1107     struct SalvageQueueNode * np, * nnp;
1108
1109     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1110         if ((np->command.sop.volume == qry->volume) && 
1111             !strncmp(np->command.sop.partName, qry->partName,
1112                      sizeof(qry->partName)))
1113             break;
1114     }
1115
1116     if (queue_IsEnd(&pendingQueue, np))
1117         np = NULL;
1118     return np;
1119 }
1120 #endif
1121
1122 static struct SalvageQueueNode *
1123 LookupPendingCommandByPid(int pid)
1124 {
1125     struct SalvageQueueNode * np, * nnp;
1126
1127     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1128         if (np->pid == pid)
1129             break;
1130     }
1131
1132     if (queue_IsEnd(&pendingQueue, np))
1133         np = NULL;
1134     return np;
1135 }
1136
1137
1138 /* raise the priority of a previously scheduled salvage */
1139 static void
1140 UpdateCommandPrio(struct SalvageQueueNode * node)
1141 {
1142     struct SalvageQueueNode *np, *nnp;
1143     afs_int32 id;
1144     afs_uint32 prio;
1145
1146     assert(queue_IsOnQueue(node));
1147
1148     prio = node->command.sop.prio;
1149     id = node->partition_id;
1150     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1151         queue_Remove(node);
1152         queue_Prepend(&salvageQueue.part[id], node);
1153     } else {
1154         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1155             if (np->command.sop.prio > prio)
1156                 break;
1157         }
1158         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1159             queue_Remove(node);
1160             queue_Prepend(&salvageQueue.part[id], node);
1161         } else if (node != np) {
1162             queue_Remove(node);
1163             queue_InsertAfter(np, node);
1164         }
1165     }
1166 }
1167
1168 /* this will need to be rearchitected if we ever want more than one thread
1169  * to wait for new salvage nodes */
1170 struct SalvageQueueNode * 
1171 SALVSYNC_getWork(void)
1172 {
1173     int i;
1174     struct DiskPartition64 * dp = NULL, * fdp;
1175     static afs_int32 next_part_sched = 0;
1176     struct SalvageQueueNode *node = NULL;
1177
1178     VOL_LOCK;
1179
1180     /*
1181      * wait for work to be scheduled
1182      * if there are no disk partitions, just sit in this wait loop forever
1183      */
1184     while (!salvageQueue.total_len || !DiskPartitionList) {
1185         VOL_CV_WAIT(&salvageQueue.cv);
1186     }
1187
1188     /* 
1189      * short circuit for simple case where only one partition has
1190      * scheduled salvages
1191      */
1192     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1193         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1194         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1195         goto have_node;
1196     }
1197
1198
1199     /* 
1200      * ok, more than one partition has scheduled salvages.
1201      * now search for partitions with scheduled salvages, but no pending salvages. 
1202      */
1203     dp = VGetPartitionById_r(next_part_sched, 0);
1204     if (!dp) {
1205         dp = DiskPartitionList;
1206     }
1207     fdp = dp;
1208
1209     for (i=0 ; 
1210          !i || dp != fdp ; 
1211          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1212         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1213             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1214             goto have_node;
1215         }
1216     }
1217
1218
1219     /*
1220      * all partitions with scheduled salvages have at least one pending.
1221      * now do an exhaustive search for a scheduled salvage.
1222      */
1223     dp = fdp;
1224
1225     for (i=0 ; 
1226          !i || dp != fdp ; 
1227          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1228         if (salvageQueue.len[dp->index]) {
1229             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1230             goto have_node;
1231         }
1232     }
1233
1234     /* we should never reach this line */
1235     assert(1==2);
1236
1237  have_node:
1238     assert(node != NULL);
1239     node->pid = 0;
1240     partition_salvaging[node->partition_id]++;
1241     DeleteFromSalvageQueue(node);
1242     AddToPendingQueue(node);
1243
1244     if (dp) {
1245         /* update next_part_sched field */
1246         if (dp->next) {
1247             next_part_sched = dp->next->index;
1248         } else if (DiskPartitionList) {
1249             next_part_sched = DiskPartitionList->index;
1250         } else {
1251             next_part_sched = -1;
1252         }
1253     }
1254
1255     VOL_UNLOCK;
1256     return node;
1257 }
1258
1259 /**
1260  * update internal scheduler state to reflect completion of a work unit.
1261  *
1262  * @param[in]  node    salvage queue node object pointer
1263  * @param[in]  result  worker process result code
1264  *
1265  * @post scheduler state is updated.
1266  *
1267  * @internal
1268  */
1269 static void
1270 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1271 {
1272     afs_int32 partid;
1273     int idx;
1274
1275     DeleteFromPendingQueue(node);
1276     partid = node->partition_id;
1277     if (partid >=0 && partid <= VOLMAXPARTS) {
1278         partition_salvaging[partid]--;
1279     }
1280     if (result == 0) {
1281         node->state = SALVSYNC_STATE_DONE;
1282     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1283         node->state = SALVSYNC_STATE_ERROR;
1284     }
1285
1286     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1287         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1288             if (node->volgroup.children[idx]) {
1289                 node->volgroup.children[idx]->state = node->state;
1290             }
1291         }
1292     }
1293 }
1294
1295 /**
1296  * check whether worker child failed.
1297  *
1298  * @param[in] status  status bitfield return by wait()
1299  *
1300  * @return boolean failure code
1301  *    @retval 0 child succeeded
1302  *    @retval 1 child failed
1303  *
1304  * @internal
1305  */
1306 static int
1307 ChildFailed(int status)
1308 {
1309     return (WCOREDUMP(status) || 
1310             WIFSIGNALED(status) || 
1311             ((WEXITSTATUS(status) != 0) && 
1312              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1313 }
1314
1315
1316 /**
1317  * notify salvsync scheduler of node completion, by child pid.
1318  *
1319  * @param[in]  pid     pid of worker child
1320  * @param[in]  status  worker status bitfield from wait()
1321  *
1322  * @post scheduler state is updated.
1323  *       if status code is a failure, fileserver notification was attempted
1324  *
1325  * @see SALVSYNC_doneWork_r
1326  */
1327 void
1328 SALVSYNC_doneWorkByPid(int pid, int status)
1329 {
1330     struct SalvageQueueNode * node;
1331     char partName[16];
1332     afs_uint32 volids[VOLMAXTYPES+1];
1333     unsigned int idx;
1334
1335     memset(volids, 0, sizeof(volids));
1336
1337     VOL_LOCK;
1338     node = LookupPendingCommandByPid(pid);
1339     if (node != NULL) {
1340         SALVSYNC_doneWork_r(node, status);
1341
1342         if (ChildFailed(status)) {
1343             /* populate volume id list for later processing outside the glock */
1344             volids[0] = node->command.sop.volume;
1345             strcpy(partName, node->command.sop.partName);
1346             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1347                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1348                     if (node->volgroup.children[idx]) {
1349                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1350                     }
1351                 }
1352             }
1353         }
1354     }
1355     VOL_UNLOCK;
1356
1357     /*
1358      * if necessary, notify fileserver of
1359      * failure to salvage volume group
1360      * [we cannot guarantee that the child made the
1361      *  appropriate notifications (e.g. SIGSEGV)]
1362      *  -- tkeiser 11/28/2007
1363      */
1364     if (ChildFailed(status)) {
1365         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1366             if (volids[idx]) {
1367                 FSYNC_VolOp(volids[idx],
1368                             partName,
1369                             FSYNC_VOL_FORCE_ERROR,
1370                             FSYNC_WHATEVER,
1371                             NULL);
1372             }
1373         }
1374     }
1375 }
1376
1377 #endif /* AFS_DEMAND_ATTACH_FS */