db5848a02fd5c7854e3a4fdb9091693f846bdbe1
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29
30 #include <sys/types.h>
31 #include <stdio.h>
32 #ifdef AFS_NT40_ENV
33 #include <winsock2.h>
34 #include <time.h>
35 #else
36 #include <sys/param.h>
37 #include <sys/socket.h>
38 #include <netinet/in.h>
39 #include <netdb.h>
40 #include <sys/time.h>
41 #endif
42 #include <errno.h>
43 #include <assert.h>
44 #include <signal.h>
45 #include <string.h>
46
47
48 #include <rx/xdr.h>
49 #include <afs/afsint.h>
50 #include "nfs.h"
51 #include <afs/errors.h>
52 #include "salvsync.h"
53 #include "lwp.h"
54 #include "lock.h"
55 #include <afs/afssyscalls.h>
56 #include "ihandle.h"
57 #include "vnode.h"
58 #include "volume.h"
59 #include "partition.h"
60 #include "common.h"
61 #include <rx/rx_queue.h>
62 #include <afs/procmgmt.h>
63
64 #if !defined(offsetof)
65 #include <stddef.h>
66 #endif
67
68 #ifdef USE_UNIX_SOCKETS
69 #include <afs/afsutil.h>
70 #include <sys/un.h>
71 #endif
72
73 #ifndef WCOREDUMP
74 #define WCOREDUMP(x)    ((x) & 0200)
75 #endif
76
77 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
78                                  * move = dump+restore can run on single server */
79
80
81 /*
82  * This lock controls access to the handler array.
83  */
84 struct Lock SALVSYNC_handler_lock;
85
86
87 #ifdef AFS_DEMAND_ATTACH_FS
88 /*
89  * SALVSYNC is a feature specific to the demand attach fileserver
90  */
91
92 /* Forward declarations */
93 static void * SALVSYNC_syncThread(void *);
94 static void SALVSYNC_newconnection(osi_socket fd);
95 static void SALVSYNC_com(osi_socket fd);
96 static void SALVSYNC_Drop(osi_socket fd);
97 static void AcceptOn(void);
98 static void AcceptOff(void);
99 static void InitHandler(void);
100 static void CallHandler(fd_set * fdsetp);
101 static int AddHandler(osi_socket afd, void (*aproc) (int));
102 static int FindHandler(osi_socket afd);
103 static int FindHandler_r(osi_socket afd);
104 static int RemoveHandler(osi_socket afd);
105 static void GetHandler(fd_set * fdsetp, int *maxfdp);
106
107 static int AllocNode(struct SalvageQueueNode ** node);
108
109 static int AddToSalvageQueue(struct SalvageQueueNode * node);
110 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
111 static void AddToPendingQueue(struct SalvageQueueNode * node);
112 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
113 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
114 static void UpdateCommandPrio(struct SalvageQueueNode * node);
115 static void HandlePrio(struct SalvageQueueNode * clone,
116                        struct SalvageQueueNode * parent,
117                        afs_uint32 new_prio);
118
119 static int LinkNode(struct SalvageQueueNode * parent,
120                     struct SalvageQueueNode * clone);
121
122 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
123                                             struct SalvageQueueNode ** parent);
124 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
125                                                      struct SalvageQueueNode ** parent);
126 static void AddNodeToHash(struct SalvageQueueNode * node);
127
128 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
129 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
130 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
131 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
132 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
133
134
135 extern int LogLevel;
136 extern int VInit;
137 extern pthread_mutex_t vol_salvsync_mutex;
138
139 /**
140  * salvsync server socket handle.
141  */
142 static SYNC_server_state_t salvsync_server_state =
143     { -1,                       /* file descriptor */
144       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
145       SALVSYNC_PROTO_VERSION,   /* protocol version */
146       5,                        /* bind() retry limit */
147       100,                      /* listen() queue depth */
148       "SALVSYNC",               /* protocol name string */
149     };
150
151
152 /**
153  * queue of all volumes waiting to be salvaged.
154  */
155 struct SalvageQueue {
156     volatile int total_len;
157     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
158     volatile int len[VOLMAXPARTS+1];
159     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
160     pthread_cond_t cv;
161 };
162 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
163
164 /**
165  * queue of all volumes currently being salvaged.
166  */
167 struct QueueHead {
168     volatile struct rx_queue q;  /**< queue of salvages in progress */
169     volatile int len;            /**< length of in-progress queue */
170     pthread_cond_t queue_change_cv;
171 };
172 static struct QueueHead pendingQueue;  /* volumes being salvaged */
173
174 /* XXX
175  * whether a partition has a salvage in progress
176  *
177  * the salvager code only permits one salvage per partition at a time
178  *
179  * the following hack tries to keep salvaged parallelism high by
180  * only permitting one salvage dispatch per partition at a time
181  *
182  * unfortunately, the parallel salvager currently
183  * has a rather braindead routine that won't permit
184  * multiple salvages on the same "device".  this
185  * function happens to break pretty badly on lvm, raid luns, etc.
186  *
187  * this hack isn't good enough to stop the device limiting code from
188  * crippling performance.  someday that code needs to be rewritten
189  */
190 static int partition_salvaging[VOLMAXPARTS+1];
191
192 static int HandlerFD[MAXHANDLERS];
193 static void (*HandlerProc[MAXHANDLERS]) (int);
194
195 #define VSHASH_SIZE 64
196 #define VSHASH_MASK (VSHASH_SIZE-1)
197 #define VSHASH(vid) ((vid)&VSHASH_MASK)
198
199 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
200
201 static struct SalvageQueueNode *
202 LookupNode(afs_uint32 vid, char * partName,
203            struct SalvageQueueNode ** parent)
204 {
205     struct rx_queue *qp, *nqp;
206     struct SalvageQueueNode *vsp;
207     int idx = VSHASH(vid);
208
209     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
210         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
211         if ((vsp->command.sop.volume == vid) &&
212             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
213             break;
214         }
215     }
216
217     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
218         vsp = NULL;
219     }
220
221     if (parent) {
222         if (vsp) {
223             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
224                 vsp->volgroup.parent : vsp;
225         } else {
226             *parent = NULL;
227         }
228     }
229
230     return vsp;
231 }
232
233 static struct SalvageQueueNode *
234 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
235                     struct SalvageQueueNode ** parent)
236 {
237     return LookupNode(qry->volume, qry->partName, parent);
238 }
239
240 static void
241 AddNodeToHash(struct SalvageQueueNode * node)
242 {
243     int idx = VSHASH(node->command.sop.volume);
244
245     if (queue_IsOnQueue(&node->hash_chain)) {
246         return;
247     }
248
249     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
250     SalvageHashTable[idx].len++;
251 }
252
253 #if 0
254 static void
255 DeleteNodeFromHash(struct SalvageQueueNode * node)
256 {
257     int idx = VSHASH(node->command.sop.volume);
258
259     if (queue_IsNotOnQueue(&node->hash_chain)) {
260         return;
261     }
262
263     queue_Remove(&node->hash_chain);
264     SalvageHashTable[idx].len--;
265 }
266 #endif
267
268 void
269 SALVSYNC_salvInit(void)
270 {
271     int i;
272     pthread_t tid;
273     pthread_attr_t tattr;
274
275     /* initialize the queues */
276     Lock_Init(&SALVSYNC_handler_lock);
277     assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
278     for (i = 0; i <= VOLMAXPARTS; i++) {
279         queue_Init(&salvageQueue.part[i]);
280         salvageQueue.len[i] = 0;
281     }
282     assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
283     queue_Init(&pendingQueue);
284     salvageQueue.total_len = pendingQueue.len = 0;
285     salvageQueue.last_insert = -1;
286     memset(partition_salvaging, 0, sizeof(partition_salvaging));
287
288     for (i = 0; i < VSHASH_SIZE; i++) {
289         assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
290         SalvageHashTable[i].len = 0;
291         queue_Init(&SalvageHashTable[i]);
292     }
293
294     /* start the salvsync thread */
295     assert(pthread_attr_init(&tattr) == 0);
296     assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
297     assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
298 }
299
300 static void
301 CleanFDs(void)
302 {
303     int i;
304     for (i = 0; i < MAXHANDLERS; ++i) {
305         if (HandlerFD[i] >= 0) {
306             SALVSYNC_Drop(HandlerFD[i]);
307         }
308     }
309
310     /* just in case we were in AcceptOff mode, and thus this fd wouldn't
311      * have a handler */
312     close(salvsync_server_state.fd);
313     salvsync_server_state.fd = -1;
314 }
315
316 static fd_set SALVSYNC_readfds;
317
318 static void *
319 SALVSYNC_syncThread(void * args)
320 {
321     int code;
322     SYNC_server_state_t * state = &salvsync_server_state;
323
324     /* when we fork, the child needs to close the salvsync server sockets,
325      * otherwise, it may get salvsync requests, instead of the parent
326      * salvageserver */
327     assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
328
329     SYNC_getAddr(&state->endpoint, &state->addr);
330     SYNC_cleanupSock(state);
331
332 #ifndef AFS_NT40_ENV
333     (void)signal(SIGPIPE, SIG_IGN);
334 #endif
335
336     state->fd = SYNC_getSock(&state->endpoint);
337     code = SYNC_bindSock(state);
338     assert(!code);
339
340     InitHandler();
341     AcceptOn();
342
343     for (;;) {
344         int maxfd;
345         GetHandler(&SALVSYNC_readfds, &maxfd);
346         /* Note: check for >= 1 below is essential since IOMGR_select
347          * doesn't have exactly same semantics as select.
348          */
349         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
350             CallHandler(&SALVSYNC_readfds);
351     }
352
353     return NULL;
354 }
355
356 static void
357 SALVSYNC_newconnection(int afd)
358 {
359 #ifdef USE_UNIX_SOCKETS
360     struct sockaddr_un other;
361 #else  /* USE_UNIX_SOCKETS */
362     struct sockaddr_in other;
363 #endif
364     int fd;
365     socklen_t junk;
366
367     junk = sizeof(other);
368     fd = accept(afd, (struct sockaddr *)&other, &junk);
369     if (fd == -1) {
370         Log("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
371         assert(1 == 2);
372     } else if (!AddHandler(fd, SALVSYNC_com)) {
373         AcceptOff();
374         assert(AddHandler(fd, SALVSYNC_com));
375     }
376 }
377
378 /* this function processes commands from an salvsync file descriptor (fd) */
379 static afs_int32 SALV_cnt = 0;
380 static void
381 SALVSYNC_com(osi_socket fd)
382 {
383     SYNC_command com;
384     SYNC_response res;
385     SALVSYNC_response_hdr sres_hdr;
386     SALVSYNC_command scom;
387     SALVSYNC_response sres;
388     SYNC_PROTO_BUF_DECL(buf);
389
390     memset(&com, 0, sizeof(com));
391     memset(&res, 0, sizeof(res));
392     memset(&scom, 0, sizeof(scom));
393     memset(&sres, 0, sizeof(sres));
394     memset(&sres_hdr, 0, sizeof(sres_hdr));
395
396     com.payload.buf = (void *)buf;
397     com.payload.len = SYNC_PROTO_MAX_LEN;
398     res.payload.buf = (void *) &sres_hdr;
399     res.payload.len = sizeof(sres_hdr);
400     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
401     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
402
403     scom.hdr = &com.hdr;
404     scom.sop = (SALVSYNC_command_hdr *) buf;
405     scom.com = &com;
406     sres.hdr = &res.hdr;
407     sres.sop = &sres_hdr;
408     sres.res = &res;
409
410     SALV_cnt++;
411     if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
412         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
413         SALVSYNC_Drop(fd);
414         return;
415     }
416
417     if (com.recv_len < sizeof(com.hdr)) {
418         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
419         res.hdr.response = SYNC_COM_ERROR;
420         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
421         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
422         goto respond;
423     }
424
425     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
426         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
427         res.hdr.response = SYNC_COM_ERROR;
428         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
429         goto respond;
430     }
431
432     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
433         res.hdr.response = SYNC_OK;
434         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
435
436         /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
437          * never wait for a response. */
438         goto done;
439     }
440
441     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
442         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
443         res.hdr.response = SYNC_COM_ERROR;
444         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
445         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
446         goto respond;
447     }
448
449     res.hdr.com_seq = com.hdr.com_seq;
450
451     VOL_LOCK;
452     switch (com.hdr.command) {
453     case SALVSYNC_NOP:
454         break;
455     case SALVSYNC_SALVAGE:
456     case SALVSYNC_RAISEPRIO:
457         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
458         break;
459     case SALVSYNC_CANCEL:
460         /* cancel a salvage */
461         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
462         break;
463     case SALVSYNC_CANCELALL:
464         /* cancel all queued salvages */
465         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
466         break;
467     case SALVSYNC_QUERY:
468         /* query whether a volume is done salvaging */
469         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
470         break;
471     case SALVSYNC_OP_LINK:
472         /* link a clone to its parent in the scheduler */
473         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
474         break;
475     default:
476         res.hdr.response = SYNC_BAD_COMMAND;
477         break;
478     }
479
480     sres_hdr.sq_len = salvageQueue.total_len;
481     sres_hdr.pq_len = pendingQueue.len;
482     VOL_UNLOCK;
483
484  respond:
485     SYNC_putRes(&salvsync_server_state, fd, &res);
486
487  done:
488     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
489         SALVSYNC_Drop(fd);
490     }
491 }
492
493 /**
494  * request that a volume be salvaged.
495  *
496  * @param[in]  com  inbound command object
497  * @param[out] res  outbound response object
498  *
499  * @return operation status
500  *    @retval SYNC_OK success
501  *    @retval SYNC_DENIED failed to enqueue request
502  *    @retval SYNC_FAILED malformed command packet
503  *
504  * @note this is a SALVSYNC protocol rpc handler
505  *
506  * @internal
507  *
508  * @post the volume is enqueued in the to-be-salvaged queue.
509  *       if the volume was already in the salvage queue, its
510  *       priority (and thus its location in the queue) are
511  *       updated.
512  */
513 static afs_int32
514 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
515 {
516     afs_int32 code = SYNC_OK;
517     struct SalvageQueueNode * node, * clone;
518     int hash = 0;
519
520     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
521         code = SYNC_FAILED;
522         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
523         goto done;
524     }
525
526     clone = LookupNodeByCommand(com->sop, &node);
527
528     if (node == NULL) {
529         if (AllocNode(&node)) {
530             code = SYNC_DENIED;
531             res->hdr->reason = SYNC_REASON_NOMEM;
532             goto done;
533         }
534         clone = node;
535         hash = 1;
536     }
537
538     HandlePrio(clone, node, com->sop->prio);
539
540     switch (node->state) {
541     case SALVSYNC_STATE_QUEUED:
542         UpdateCommandPrio(node);
543         break;
544
545     case SALVSYNC_STATE_ERROR:
546     case SALVSYNC_STATE_DONE:
547     case SALVSYNC_STATE_UNKNOWN:
548         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
549         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
550
551         /*
552          * make sure volgroup parent partition path is kept coherent
553          *
554          * If we ever want to support non-COW clones on a machine holding
555          * the RW site, please note that this code does not work under the
556          * conditions where someone zaps a COW clone on partition X, and
557          * subsequently creates a full clone on partition Y -- we'd need
558          * an inverse to SALVSYNC_com_Link.
559          *  -- tkeiser 11/28/2007
560          */
561         strcpy(node->command.sop.partName, com->sop->partName);
562
563         if (AddToSalvageQueue(node)) {
564             code = SYNC_DENIED;
565         }
566         break;
567
568     default:
569         break;
570     }
571
572     if (hash) {
573         AddNodeToHash(node);
574     }
575
576     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
577     res->sop->state = node->state;
578     res->sop->prio = node->command.sop.prio;
579
580  done:
581     return code;
582 }
583
584 /**
585  * cancel a pending salvage request.
586  *
587  * @param[in]  com  inbound command object
588  * @param[out] res  outbound response object
589  *
590  * @return operation status
591  *    @retval SYNC_OK success
592  *    @retval SYNC_FAILED malformed command packet
593  *
594  * @note this is a SALVSYNC protocol rpc handler
595  *
596  * @internal
597  */
598 static afs_int32
599 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
600 {
601     afs_int32 code = SYNC_OK;
602     struct SalvageQueueNode * node;
603
604     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
605         code = SYNC_FAILED;
606         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
607         goto done;
608     }
609
610     node = LookupNodeByCommand(com->sop, NULL);
611
612     if (node == NULL) {
613         res->sop->state = SALVSYNC_STATE_UNKNOWN;
614         res->sop->prio = 0;
615     } else {
616         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
617         res->sop->prio = node->command.sop.prio;
618         res->sop->state = node->state;
619         if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
620             (node->state == SALVSYNC_STATE_QUEUED)) {
621             DeleteFromSalvageQueue(node);
622         }
623     }
624
625  done:
626     return code;
627 }
628
629 /**
630  * cancel all pending salvage requests.
631  *
632  * @param[in]  com  incoming command object
633  * @param[out] res  outbound response object
634  *
635  * @return operation status
636  *    @retval SYNC_OK success
637  *
638  * @note this is a SALVSYNC protocol rpc handler
639  *
640  * @internal
641  */
642 static afs_int32
643 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
644 {
645     struct SalvageQueueNode * np, *nnp;
646     struct DiskPartition64 * dp;
647
648     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
649         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
650             DeleteFromSalvageQueue(np);
651         }
652     }
653
654     return SYNC_OK;
655 }
656
657 /**
658  * link a queue node for a clone to its parent volume.
659  *
660  * @param[in]  com   inbound command object
661  * @param[out] res   outbound response object
662  *
663  * @return operation status
664  *    @retval SYNC_OK success
665  *    @retval SYNC_FAILED malformed command packet
666  *    @retval SYNC_DENIED the request could not be completed
667  *
668  * @note this is a SALVSYNC protocol rpc handler
669  *
670  * @post the requested volume is marked as a child of another volume.
671  *       thus, future salvage requests for this volume will result in the
672  *       parent of the volume group being scheduled for salvage instead
673  *       of this clone.
674  *
675  * @internal
676  */
677 static afs_int32
678 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
679 {
680     afs_int32 code = SYNC_OK;
681     struct SalvageQueueNode * clone, * parent;
682
683     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
684         code = SYNC_FAILED;
685         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
686         goto done;
687     }
688
689     /* lookup clone's salvage scheduling node */
690     clone = LookupNodeByCommand(com->sop, NULL);
691     if (clone == NULL) {
692         code = SYNC_DENIED;
693         res->hdr->reason = SALVSYNC_REASON_ERROR;
694         goto done;
695     }
696
697     /* lookup parent's salvage scheduling node */
698     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
699     if (parent == NULL) {
700         if (AllocNode(&parent)) {
701             code = SYNC_DENIED;
702             res->hdr->reason = SYNC_REASON_NOMEM;
703             goto done;
704         }
705         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
706         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
707         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
708         AddNodeToHash(parent);
709     }
710
711     if (LinkNode(parent, clone)) {
712         code = SYNC_DENIED;
713         goto done;
714     }
715
716  done:
717     return code;
718 }
719
720 /**
721  * query the status of a volume salvage request.
722  *
723  * @param[in]  com   inbound command object
724  * @param[out] res   outbound response object
725  *
726  * @return operation status
727  *    @retval SYNC_OK success
728  *    @retval SYNC_FAILED malformed command packet
729  *
730  * @note this is a SALVSYNC protocol rpc handler
731  *
732  * @internal
733  */
734 static afs_int32
735 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
736 {
737     afs_int32 code = SYNC_OK;
738     struct SalvageQueueNode * node;
739
740     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
741         code = SYNC_FAILED;
742         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
743         goto done;
744     }
745
746     LookupNodeByCommand(com->sop, &node);
747
748     /* query whether a volume is done salvaging */
749     if (node == NULL) {
750         res->sop->state = SALVSYNC_STATE_UNKNOWN;
751         res->sop->prio = 0;
752     } else {
753         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
754         res->sop->state = node->state;
755         res->sop->prio = node->command.sop.prio;
756     }
757
758  done:
759     return code;
760 }
761
762 static void
763 SALVSYNC_Drop(osi_socket fd)
764 {
765     RemoveHandler(fd);
766 #ifdef AFS_NT40_ENV
767     closesocket(fd);
768 #else
769     close(fd);
770 #endif
771     AcceptOn();
772 }
773
774 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
775
776 static void
777 AcceptOn(void)
778 {
779     if (AcceptHandler == -1) {
780         assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
781         AcceptHandler = FindHandler(salvsync_server_state.fd);
782     }
783 }
784
785 static void
786 AcceptOff(void)
787 {
788     if (AcceptHandler != -1) {
789         assert(RemoveHandler(salvsync_server_state.fd));
790         AcceptHandler = -1;
791     }
792 }
793
794 /* The multiple FD handling code. */
795
796 static void
797 InitHandler(void)
798 {
799     int i;
800     ObtainWriteLock(&SALVSYNC_handler_lock);
801     for (i = 0; i < MAXHANDLERS; i++) {
802         HandlerFD[i] = -1;
803         HandlerProc[i] = NULL;
804     }
805     ReleaseWriteLock(&SALVSYNC_handler_lock);
806 }
807
808 static void
809 CallHandler(fd_set * fdsetp)
810 {
811     int i;
812     ObtainReadLock(&SALVSYNC_handler_lock);
813     for (i = 0; i < MAXHANDLERS; i++) {
814         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
815             ReleaseReadLock(&SALVSYNC_handler_lock);
816             (*HandlerProc[i]) (HandlerFD[i]);
817             ObtainReadLock(&SALVSYNC_handler_lock);
818         }
819     }
820     ReleaseReadLock(&SALVSYNC_handler_lock);
821 }
822
823 static int
824 AddHandler(osi_socket afd, void (*aproc) (int))
825 {
826     int i;
827     ObtainWriteLock(&SALVSYNC_handler_lock);
828     for (i = 0; i < MAXHANDLERS; i++)
829         if (HandlerFD[i] == -1)
830             break;
831     if (i >= MAXHANDLERS) {
832         ReleaseWriteLock(&SALVSYNC_handler_lock);
833         return 0;
834     }
835     HandlerFD[i] = afd;
836     HandlerProc[i] = aproc;
837     ReleaseWriteLock(&SALVSYNC_handler_lock);
838     return 1;
839 }
840
841 static int
842 FindHandler(osi_socket afd)
843 {
844     int i;
845     ObtainReadLock(&SALVSYNC_handler_lock);
846     for (i = 0; i < MAXHANDLERS; i++)
847         if (HandlerFD[i] == afd) {
848             ReleaseReadLock(&SALVSYNC_handler_lock);
849             return i;
850         }
851     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
852     assert(1 == 2);
853     return -1;                  /* satisfy compiler */
854 }
855
856 static int
857 FindHandler_r(osi_socket afd)
858 {
859     int i;
860     for (i = 0; i < MAXHANDLERS; i++)
861         if (HandlerFD[i] == afd) {
862             return i;
863         }
864     assert(1 == 2);
865     return -1;                  /* satisfy compiler */
866 }
867
868 static int
869 RemoveHandler(osi_socket afd)
870 {
871     ObtainWriteLock(&SALVSYNC_handler_lock);
872     HandlerFD[FindHandler_r(afd)] = -1;
873     ReleaseWriteLock(&SALVSYNC_handler_lock);
874     return 1;
875 }
876
877 static void
878 GetHandler(fd_set * fdsetp, int *maxfdp)
879 {
880     int i;
881     int maxfd = -1;
882     FD_ZERO(fdsetp);
883     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
884     for (i = 0; i < MAXHANDLERS; i++)
885         if (HandlerFD[i] != -1) {
886             FD_SET(HandlerFD[i], fdsetp);
887             if (maxfd < HandlerFD[i])
888                 maxfd = HandlerFD[i];
889         }
890     *maxfdp = maxfd;
891     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
892 }
893
894 /**
895  * allocate a salvage queue node.
896  *
897  * @param[out] node_out  address in which to store new node pointer
898  *
899  * @return operation status
900  *    @retval 0 success
901  *    @retval 1 failed to allocate node
902  *
903  * @internal
904  */
905 static int
906 AllocNode(struct SalvageQueueNode ** node_out)
907 {
908     int code = 0;
909     struct SalvageQueueNode * node;
910
911     *node_out = node = (struct SalvageQueueNode *)
912         malloc(sizeof(struct SalvageQueueNode));
913     if (node == NULL) {
914         code = 1;
915         goto done;
916     }
917
918     memset(node, 0, sizeof(struct SalvageQueueNode));
919     node->type = SALVSYNC_VOLGROUP_PARENT;
920     node->state = SALVSYNC_STATE_UNKNOWN;
921
922  done:
923     return code;
924 }
925
926 /**
927  * link a salvage queue node to its parent.
928  *
929  * @param[in] parent  pointer to queue node for parent of volume group
930  * @param[in] clone   pointer to queue node for a clone
931  *
932  * @return operation status
933  *    @retval 0 success
934  *    @retval 1 failure
935  *
936  * @internal
937  */
938 static int
939 LinkNode(struct SalvageQueueNode * parent,
940          struct SalvageQueueNode * clone)
941 {
942     int code = 0;
943     int idx;
944
945     /* check for attaching a clone to a clone */
946     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
947         code = 1;
948         goto done;
949     }
950
951     /* check for pre-existing registration and openings */
952     for (idx = 0; idx < VOLMAXTYPES; idx++) {
953         if (parent->volgroup.children[idx] == clone) {
954             goto linked;
955         }
956         if (parent->volgroup.children[idx] == NULL) {
957             break;
958         }
959     }
960     if (idx == VOLMAXTYPES) {
961         code = 1;
962         goto done;
963     }
964
965     /* link parent and child */
966     parent->volgroup.children[idx] = clone;
967     clone->type = SALVSYNC_VOLGROUP_CLONE;
968     clone->volgroup.parent = parent;
969
970
971  linked:
972     switch (clone->state) {
973     case SALVSYNC_STATE_QUEUED:
974         DeleteFromSalvageQueue(clone);
975
976     case SALVSYNC_STATE_SALVAGING:
977         switch (parent->state) {
978         case SALVSYNC_STATE_UNKNOWN:
979         case SALVSYNC_STATE_ERROR:
980         case SALVSYNC_STATE_DONE:
981             parent->command.sop.prio = clone->command.sop.prio;
982             AddToSalvageQueue(parent);
983             break;
984
985         case SALVSYNC_STATE_QUEUED:
986             if (clone->command.sop.prio) {
987                 parent->command.sop.prio += clone->command.sop.prio;
988                 UpdateCommandPrio(parent);
989             }
990             break;
991
992         default:
993             break;
994         }
995         break;
996
997     default:
998         break;
999     }
1000
1001  done:
1002     return code;
1003 }
1004
1005 static void
1006 HandlePrio(struct SalvageQueueNode * clone,
1007            struct SalvageQueueNode * node,
1008            afs_uint32 new_prio)
1009 {
1010     afs_uint32 delta;
1011
1012     switch (node->state) {
1013     case SALVSYNC_STATE_ERROR:
1014     case SALVSYNC_STATE_DONE:
1015     case SALVSYNC_STATE_UNKNOWN:
1016         node->command.sop.prio = 0;
1017         break;
1018     default:
1019         break;
1020     }
1021
1022     if (new_prio < clone->command.sop.prio) {
1023         /* strange. let's just set our delta to 1 */
1024         delta = 1;
1025     } else {
1026         delta = new_prio - clone->command.sop.prio;
1027     }
1028
1029     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1030         clone->command.sop.prio = new_prio;
1031     }
1032
1033     node->command.sop.prio += delta;
1034 }
1035
1036 static int
1037 AddToSalvageQueue(struct SalvageQueueNode * node)
1038 {
1039     afs_int32 id;
1040     struct SalvageQueueNode * last = NULL;
1041
1042     id = volutil_GetPartitionID(node->command.sop.partName);
1043     if (id < 0 || id > VOLMAXPARTS) {
1044         return 1;
1045     }
1046     if (!VGetPartitionById_r(id, 0)) {
1047         /* don't enqueue salvage requests for unmounted partitions */
1048         return 1;
1049     }
1050     if (queue_IsOnQueue(node)) {
1051         return 0;
1052     }
1053
1054     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1055         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1056     }
1057     queue_Append(&salvageQueue.part[id], node);
1058     salvageQueue.len[id]++;
1059     salvageQueue.total_len++;
1060     salvageQueue.last_insert = id;
1061     node->partition_id = id;
1062     node->state = SALVSYNC_STATE_QUEUED;
1063
1064     /* reorder, if necessary */
1065     if (last && last->command.sop.prio < node->command.sop.prio) {
1066         UpdateCommandPrio(node);
1067     }
1068
1069     assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1070     return 0;
1071 }
1072
1073 static void
1074 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1075 {
1076     if (queue_IsOnQueue(node)) {
1077         queue_Remove(node);
1078         salvageQueue.len[node->partition_id]--;
1079         salvageQueue.total_len--;
1080         node->state = SALVSYNC_STATE_UNKNOWN;
1081         assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1082     }
1083 }
1084
1085 static void
1086 AddToPendingQueue(struct SalvageQueueNode * node)
1087 {
1088     queue_Append(&pendingQueue, node);
1089     pendingQueue.len++;
1090     node->state = SALVSYNC_STATE_SALVAGING;
1091     assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1092 }
1093
1094 static void
1095 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1096 {
1097     if (queue_IsOnQueue(node)) {
1098         queue_Remove(node);
1099         pendingQueue.len--;
1100         node->state = SALVSYNC_STATE_UNKNOWN;
1101         assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1102     }
1103 }
1104
1105 #if 0
1106 static struct SalvageQueueNode *
1107 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1108 {
1109     struct SalvageQueueNode * np, * nnp;
1110
1111     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1112         if ((np->command.sop.volume == qry->volume) &&
1113             !strncmp(np->command.sop.partName, qry->partName,
1114                      sizeof(qry->partName)))
1115             break;
1116     }
1117
1118     if (queue_IsEnd(&pendingQueue, np))
1119         np = NULL;
1120     return np;
1121 }
1122 #endif
1123
1124 static struct SalvageQueueNode *
1125 LookupPendingCommandByPid(int pid)
1126 {
1127     struct SalvageQueueNode * np, * nnp;
1128
1129     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1130         if (np->pid == pid)
1131             break;
1132     }
1133
1134     if (queue_IsEnd(&pendingQueue, np))
1135         np = NULL;
1136     return np;
1137 }
1138
1139
1140 /* raise the priority of a previously scheduled salvage */
1141 static void
1142 UpdateCommandPrio(struct SalvageQueueNode * node)
1143 {
1144     struct SalvageQueueNode *np, *nnp;
1145     afs_int32 id;
1146     afs_uint32 prio;
1147
1148     assert(queue_IsOnQueue(node));
1149
1150     prio = node->command.sop.prio;
1151     id = node->partition_id;
1152     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1153         queue_Remove(node);
1154         queue_Prepend(&salvageQueue.part[id], node);
1155     } else {
1156         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1157             if (np->command.sop.prio > prio)
1158                 break;
1159         }
1160         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1161             queue_Remove(node);
1162             queue_Prepend(&salvageQueue.part[id], node);
1163         } else if (node != np) {
1164             queue_Remove(node);
1165             queue_InsertAfter(np, node);
1166         }
1167     }
1168 }
1169
1170 /* this will need to be rearchitected if we ever want more than one thread
1171  * to wait for new salvage nodes */
1172 struct SalvageQueueNode *
1173 SALVSYNC_getWork(void)
1174 {
1175     int i;
1176     struct DiskPartition64 * dp = NULL, * fdp;
1177     static afs_int32 next_part_sched = 0;
1178     struct SalvageQueueNode *node = NULL;
1179
1180     VOL_LOCK;
1181
1182     /*
1183      * wait for work to be scheduled
1184      * if there are no disk partitions, just sit in this wait loop forever
1185      */
1186     while (!salvageQueue.total_len || !DiskPartitionList) {
1187         VOL_CV_WAIT(&salvageQueue.cv);
1188     }
1189
1190     /*
1191      * short circuit for simple case where only one partition has
1192      * scheduled salvages
1193      */
1194     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1195         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1196         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1197         goto have_node;
1198     }
1199
1200
1201     /*
1202      * ok, more than one partition has scheduled salvages.
1203      * now search for partitions with scheduled salvages, but no pending salvages.
1204      */
1205     dp = VGetPartitionById_r(next_part_sched, 0);
1206     if (!dp) {
1207         dp = DiskPartitionList;
1208     }
1209     fdp = dp;
1210
1211     for (i=0 ;
1212          !i || dp != fdp ;
1213          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1214         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1215             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1216             goto have_node;
1217         }
1218     }
1219
1220
1221     /*
1222      * all partitions with scheduled salvages have at least one pending.
1223      * now do an exhaustive search for a scheduled salvage.
1224      */
1225     dp = fdp;
1226
1227     for (i=0 ;
1228          !i || dp != fdp ;
1229          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1230         if (salvageQueue.len[dp->index]) {
1231             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1232             goto have_node;
1233         }
1234     }
1235
1236     /* we should never reach this line */
1237     assert(1==2);
1238
1239  have_node:
1240     assert(node != NULL);
1241     node->pid = 0;
1242     partition_salvaging[node->partition_id]++;
1243     DeleteFromSalvageQueue(node);
1244     AddToPendingQueue(node);
1245
1246     if (dp) {
1247         /* update next_part_sched field */
1248         if (dp->next) {
1249             next_part_sched = dp->next->index;
1250         } else if (DiskPartitionList) {
1251             next_part_sched = DiskPartitionList->index;
1252         } else {
1253             next_part_sched = -1;
1254         }
1255     }
1256
1257     VOL_UNLOCK;
1258     return node;
1259 }
1260
1261 /**
1262  * update internal scheduler state to reflect completion of a work unit.
1263  *
1264  * @param[in]  node    salvage queue node object pointer
1265  * @param[in]  result  worker process result code
1266  *
1267  * @post scheduler state is updated.
1268  *
1269  * @internal
1270  */
1271 static void
1272 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1273 {
1274     afs_int32 partid;
1275     int idx;
1276
1277     DeleteFromPendingQueue(node);
1278     partid = node->partition_id;
1279     if (partid >=0 && partid <= VOLMAXPARTS) {
1280         partition_salvaging[partid]--;
1281     }
1282     if (result == 0) {
1283         node->state = SALVSYNC_STATE_DONE;
1284     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1285         node->state = SALVSYNC_STATE_ERROR;
1286     }
1287
1288     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1289         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1290             if (node->volgroup.children[idx]) {
1291                 node->volgroup.children[idx]->state = node->state;
1292             }
1293         }
1294     }
1295 }
1296
1297 /**
1298  * check whether worker child failed.
1299  *
1300  * @param[in] status  status bitfield return by wait()
1301  *
1302  * @return boolean failure code
1303  *    @retval 0 child succeeded
1304  *    @retval 1 child failed
1305  *
1306  * @internal
1307  */
1308 static int
1309 ChildFailed(int status)
1310 {
1311     return (WCOREDUMP(status) ||
1312             WIFSIGNALED(status) ||
1313             ((WEXITSTATUS(status) != 0) &&
1314              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1315 }
1316
1317
1318 /**
1319  * notify salvsync scheduler of node completion, by child pid.
1320  *
1321  * @param[in]  pid     pid of worker child
1322  * @param[in]  status  worker status bitfield from wait()
1323  *
1324  * @post scheduler state is updated.
1325  *       if status code is a failure, fileserver notification was attempted
1326  *
1327  * @see SALVSYNC_doneWork_r
1328  */
1329 void
1330 SALVSYNC_doneWorkByPid(int pid, int status)
1331 {
1332     struct SalvageQueueNode * node;
1333     char partName[16];
1334     afs_uint32 volids[VOLMAXTYPES+1];
1335     unsigned int idx;
1336
1337     memset(volids, 0, sizeof(volids));
1338
1339     VOL_LOCK;
1340     node = LookupPendingCommandByPid(pid);
1341     if (node != NULL) {
1342         SALVSYNC_doneWork_r(node, status);
1343
1344         if (ChildFailed(status)) {
1345             /* populate volume id list for later processing outside the glock */
1346             volids[0] = node->command.sop.volume;
1347             strcpy(partName, node->command.sop.partName);
1348             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1349                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1350                     if (node->volgroup.children[idx]) {
1351                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1352                     }
1353                 }
1354             }
1355         }
1356     }
1357     VOL_UNLOCK;
1358
1359     /*
1360      * if necessary, notify fileserver of
1361      * failure to salvage volume group
1362      * [we cannot guarantee that the child made the
1363      *  appropriate notifications (e.g. SIGSEGV)]
1364      *  -- tkeiser 11/28/2007
1365      */
1366     if (ChildFailed(status)) {
1367         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1368             if (volids[idx]) {
1369                 FSYNC_VolOp(volids[idx],
1370                             partName,
1371                             FSYNC_VOL_FORCE_ERROR,
1372                             FSYNC_WHATEVER,
1373                             NULL);
1374             }
1375         }
1376     }
1377 }
1378
1379 #endif /* AFS_DEMAND_ATTACH_FS */