d9343bc8016a9ad78c1850ce8b8fb94fded7bf58
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29 #include <afs/procmgmt.h>
30 #include <roken.h>
31
32 #include <stddef.h>
33
34 #include <afs/opr.h>
35 #include <afs/afsint.h>
36 #include "nfs.h"
37 #include <afs/errors.h>
38 #include "salvsync.h"
39 #include "lwp.h"
40 #include "lock.h"
41 #include <afs/afssyscalls.h>
42 #include "ihandle.h"
43 #include "vnode.h"
44 #include "volume.h"
45 #include "partition.h"
46 #include "common.h"
47 #include <rx/rx_queue.h>
48
49 #ifdef USE_UNIX_SOCKETS
50 #include <afs/afsutil.h>
51 #include <sys/un.h>
52 #endif
53
54 #ifndef WCOREDUMP
55 #define WCOREDUMP(x)    ((x) & 0200)
56 #endif
57
58 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
59                                  * move = dump+restore can run on single server */
60
61
62 /*
63  * This lock controls access to the handler array.
64  */
65 struct Lock SALVSYNC_handler_lock;
66
67
68 #ifdef AFS_DEMAND_ATTACH_FS
69 /*
70  * SALVSYNC is a feature specific to the demand attach fileserver
71  */
72
73 /* Forward declarations */
74 static void * SALVSYNC_syncThread(void *);
75 static void SALVSYNC_newconnection(osi_socket fd);
76 static void SALVSYNC_com(osi_socket fd);
77 static void SALVSYNC_Drop(osi_socket fd);
78 static void AcceptOn(void);
79 static void AcceptOff(void);
80 static void InitHandler(void);
81 static void CallHandler(fd_set * fdsetp);
82 static int AddHandler(osi_socket afd, void (*aproc) (int));
83 static int FindHandler(osi_socket afd);
84 static int FindHandler_r(osi_socket afd);
85 static int RemoveHandler(osi_socket afd);
86 static void GetHandler(fd_set * fdsetp, int *maxfdp);
87
88 static int AllocNode(struct SalvageQueueNode ** node);
89
90 static int AddToSalvageQueue(struct SalvageQueueNode * node);
91 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
92 static void AddToPendingQueue(struct SalvageQueueNode * node);
93 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
94 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
95 static void UpdateCommandPrio(struct SalvageQueueNode * node);
96 static void HandlePrio(struct SalvageQueueNode * clone,
97                        struct SalvageQueueNode * parent,
98                        afs_uint32 new_prio);
99
100 static int LinkNode(struct SalvageQueueNode * parent,
101                     struct SalvageQueueNode * clone);
102
103 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
104                                             struct SalvageQueueNode ** parent);
105 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
106                                                      struct SalvageQueueNode ** parent);
107 static void AddNodeToHash(struct SalvageQueueNode * node);
108
109 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
110 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
111 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
112 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
113 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
114
115
116 extern int LogLevel;
117 extern int VInit;
118 extern pthread_mutex_t vol_salvsync_mutex;
119
120 /**
121  * salvsync server socket handle.
122  */
123 static SYNC_server_state_t salvsync_server_state =
124     { OSI_NULLSOCKET,                       /* file descriptor */
125       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
126       SALVSYNC_PROTO_VERSION,   /* protocol version */
127       5,                        /* bind() retry limit */
128       100,                      /* listen() queue depth */
129       "SALVSYNC",               /* protocol name string */
130     };
131
132
133 /**
134  * queue of all volumes waiting to be salvaged.
135  */
136 struct SalvageQueue {
137     volatile int total_len;
138     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
139     volatile int len[VOLMAXPARTS+1];
140     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
141     pthread_cond_t cv;
142 };
143 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
144
145 /**
146  * queue of all volumes currently being salvaged.
147  */
148 struct QueueHead {
149     volatile struct rx_queue q;  /**< queue of salvages in progress */
150     volatile int len;            /**< length of in-progress queue */
151     pthread_cond_t queue_change_cv;
152 };
153 static struct QueueHead pendingQueue;  /* volumes being salvaged */
154
155 /* XXX
156  * whether a partition has a salvage in progress
157  *
158  * the salvager code only permits one salvage per partition at a time
159  *
160  * the following hack tries to keep salvaged parallelism high by
161  * only permitting one salvage dispatch per partition at a time
162  *
163  * unfortunately, the parallel salvager currently
164  * has a rather braindead routine that won't permit
165  * multiple salvages on the same "device".  this
166  * function happens to break pretty badly on lvm, raid luns, etc.
167  *
168  * this hack isn't good enough to stop the device limiting code from
169  * crippling performance.  someday that code needs to be rewritten
170  */
171 static int partition_salvaging[VOLMAXPARTS+1];
172
173 static int HandlerFD[MAXHANDLERS];
174 static void (*HandlerProc[MAXHANDLERS]) (int);
175
176 #define VSHASH_SIZE 64
177 #define VSHASH_MASK (VSHASH_SIZE-1)
178 #define VSHASH(vid) ((vid)&VSHASH_MASK)
179
180 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
181
182 static struct SalvageQueueNode *
183 LookupNode(afs_uint32 vid, char * partName,
184            struct SalvageQueueNode ** parent)
185 {
186     struct rx_queue *qp, *nqp;
187     struct SalvageQueueNode *vsp = NULL;
188     int idx = VSHASH(vid);
189
190     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
191         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
192         if ((vsp->command.sop.volume == vid) &&
193             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
194             break;
195         }
196     }
197
198     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
199         vsp = NULL;
200     }
201
202     if (parent) {
203         if (vsp) {
204             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
205                 vsp->volgroup.parent : vsp;
206         } else {
207             *parent = NULL;
208         }
209     }
210
211     return vsp;
212 }
213
214 static struct SalvageQueueNode *
215 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
216                     struct SalvageQueueNode ** parent)
217 {
218     return LookupNode(qry->volume, qry->partName, parent);
219 }
220
221 static void
222 AddNodeToHash(struct SalvageQueueNode * node)
223 {
224     int idx = VSHASH(node->command.sop.volume);
225
226     if (queue_IsOnQueue(&node->hash_chain)) {
227         return;
228     }
229
230     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
231     SalvageHashTable[idx].len++;
232 }
233
234 #if 0
235 static void
236 DeleteNodeFromHash(struct SalvageQueueNode * node)
237 {
238     int idx = VSHASH(node->command.sop.volume);
239
240     if (queue_IsNotOnQueue(&node->hash_chain)) {
241         return;
242     }
243
244     queue_Remove(&node->hash_chain);
245     SalvageHashTable[idx].len--;
246 }
247 #endif
248
249 void
250 SALVSYNC_salvInit(void)
251 {
252     int i;
253     pthread_t tid;
254     pthread_attr_t tattr;
255
256     /* initialize the queues */
257     Lock_Init(&SALVSYNC_handler_lock);
258     CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
259     for (i = 0; i <= VOLMAXPARTS; i++) {
260         queue_Init(&salvageQueue.part[i]);
261         salvageQueue.len[i] = 0;
262     }
263     CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
264     queue_Init(&pendingQueue);
265     salvageQueue.total_len = pendingQueue.len = 0;
266     salvageQueue.last_insert = -1;
267     memset(partition_salvaging, 0, sizeof(partition_salvaging));
268
269     for (i = 0; i < VSHASH_SIZE; i++) {
270         CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
271         SalvageHashTable[i].len = 0;
272         queue_Init(&SalvageHashTable[i]);
273     }
274
275     /* start the salvsync thread */
276     osi_Assert(pthread_attr_init(&tattr) == 0);
277     osi_Assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
278     osi_Assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
279 }
280
281 static void
282 CleanFDs(void)
283 {
284     int i;
285     for (i = 0; i < MAXHANDLERS; ++i) {
286         if (HandlerFD[i] >= 0) {
287             SALVSYNC_Drop(HandlerFD[i]);
288         }
289     }
290
291     /* just in case we were in AcceptOff mode, and thus this fd wouldn't
292      * have a handler */
293     close(salvsync_server_state.fd);
294     salvsync_server_state.fd = OSI_NULLSOCKET;
295 }
296
297 static fd_set SALVSYNC_readfds;
298
299 static void *
300 SALVSYNC_syncThread(void * args)
301 {
302     int code;
303     SYNC_server_state_t * state = &salvsync_server_state;
304
305     /* when we fork, the child needs to close the salvsync server sockets,
306      * otherwise, it may get salvsync requests, instead of the parent
307      * salvageserver */
308     osi_Assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
309
310     SYNC_getAddr(&state->endpoint, &state->addr);
311     SYNC_cleanupSock(state);
312
313 #ifndef AFS_NT40_ENV
314     (void)signal(SIGPIPE, SIG_IGN);
315 #endif
316
317     state->fd = SYNC_getSock(&state->endpoint);
318     code = SYNC_bindSock(state);
319     osi_Assert(!code);
320
321     InitHandler();
322     AcceptOn();
323
324     for (;;) {
325         int maxfd;
326         struct timeval s_timeout;
327         GetHandler(&SALVSYNC_readfds, &maxfd);
328         s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
329         s_timeout.tv_usec = 0;
330         /* Note: check for >= 1 below is essential since IOMGR_select
331          * doesn't have exactly same semantics as select.
332          */
333         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
334             CallHandler(&SALVSYNC_readfds);
335     }
336
337     return NULL;
338 }
339
340 static void
341 SALVSYNC_newconnection(int afd)
342 {
343 #ifdef USE_UNIX_SOCKETS
344     struct sockaddr_un other;
345 #else  /* USE_UNIX_SOCKETS */
346     struct sockaddr_in other;
347 #endif
348     int fd;
349     socklen_t junk;
350
351     junk = sizeof(other);
352     fd = accept(afd, (struct sockaddr *)&other, &junk);
353     if (fd == OSI_NULLSOCKET) {
354         osi_Panic("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
355     } else if (!AddHandler(fd, SALVSYNC_com)) {
356         AcceptOff();
357         osi_Assert(AddHandler(fd, SALVSYNC_com));
358     }
359 }
360
361 /* this function processes commands from an salvsync file descriptor (fd) */
362 static afs_int32 SALV_cnt = 0;
363 static void
364 SALVSYNC_com(osi_socket fd)
365 {
366     SYNC_command com;
367     SYNC_response res;
368     SALVSYNC_response_hdr sres_hdr;
369     SALVSYNC_command scom;
370     SALVSYNC_response sres;
371     SYNC_PROTO_BUF_DECL(buf);
372
373     memset(&com, 0, sizeof(com));
374     memset(&res, 0, sizeof(res));
375     memset(&scom, 0, sizeof(scom));
376     memset(&sres, 0, sizeof(sres));
377     memset(&sres_hdr, 0, sizeof(sres_hdr));
378
379     com.payload.buf = (void *)buf;
380     com.payload.len = SYNC_PROTO_MAX_LEN;
381     res.payload.buf = (void *) &sres_hdr;
382     res.payload.len = sizeof(sres_hdr);
383     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
384     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
385
386     scom.hdr = &com.hdr;
387     scom.sop = (SALVSYNC_command_hdr *) buf;
388     scom.com = &com;
389     sres.hdr = &res.hdr;
390     sres.sop = &sres_hdr;
391     sres.res = &res;
392
393     SALV_cnt++;
394     if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
395         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
396         SALVSYNC_Drop(fd);
397         return;
398     }
399
400     if (com.recv_len < sizeof(com.hdr)) {
401         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
402         res.hdr.response = SYNC_COM_ERROR;
403         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
404         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
405         goto respond;
406     }
407
408     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
409         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
410         res.hdr.response = SYNC_COM_ERROR;
411         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
412         goto respond;
413     }
414
415     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
416         res.hdr.response = SYNC_OK;
417         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
418
419         /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
420          * never wait for a response. */
421         goto done;
422     }
423
424     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
425         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
426         res.hdr.response = SYNC_COM_ERROR;
427         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
428         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
429         goto respond;
430     }
431
432     res.hdr.com_seq = com.hdr.com_seq;
433
434     VOL_LOCK;
435     switch (com.hdr.command) {
436     case SALVSYNC_NOP:
437         break;
438     case SALVSYNC_SALVAGE:
439     case SALVSYNC_RAISEPRIO:
440         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
441         break;
442     case SALVSYNC_CANCEL:
443         /* cancel a salvage */
444         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
445         break;
446     case SALVSYNC_CANCELALL:
447         /* cancel all queued salvages */
448         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
449         break;
450     case SALVSYNC_QUERY:
451         /* query whether a volume is done salvaging */
452         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
453         break;
454     case SALVSYNC_OP_LINK:
455         /* link a clone to its parent in the scheduler */
456         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
457         break;
458     default:
459         res.hdr.response = SYNC_BAD_COMMAND;
460         break;
461     }
462
463     sres_hdr.sq_len = salvageQueue.total_len;
464     sres_hdr.pq_len = pendingQueue.len;
465     VOL_UNLOCK;
466
467  respond:
468     SYNC_putRes(&salvsync_server_state, fd, &res);
469
470  done:
471     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
472         SALVSYNC_Drop(fd);
473     }
474 }
475
476 /**
477  * request that a volume be salvaged.
478  *
479  * @param[in]  com  inbound command object
480  * @param[out] res  outbound response object
481  *
482  * @return operation status
483  *    @retval SYNC_OK success
484  *    @retval SYNC_DENIED failed to enqueue request
485  *    @retval SYNC_FAILED malformed command packet
486  *
487  * @note this is a SALVSYNC protocol rpc handler
488  *
489  * @internal
490  *
491  * @post the volume is enqueued in the to-be-salvaged queue.
492  *       if the volume was already in the salvage queue, its
493  *       priority (and thus its location in the queue) are
494  *       updated.
495  */
496 static afs_int32
497 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
498 {
499     afs_int32 code = SYNC_OK;
500     struct SalvageQueueNode * node, * clone;
501     int hash = 0;
502
503     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
504         code = SYNC_FAILED;
505         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
506         goto done;
507     }
508
509     clone = LookupNodeByCommand(com->sop, &node);
510
511     if (node == NULL) {
512         if (AllocNode(&node)) {
513             code = SYNC_DENIED;
514             res->hdr->reason = SYNC_REASON_NOMEM;
515             goto done;
516         }
517         clone = node;
518         hash = 1;
519     }
520
521     HandlePrio(clone, node, com->sop->prio);
522
523     switch (node->state) {
524     case SALVSYNC_STATE_QUEUED:
525         UpdateCommandPrio(node);
526         break;
527
528     case SALVSYNC_STATE_ERROR:
529     case SALVSYNC_STATE_DONE:
530     case SALVSYNC_STATE_UNKNOWN:
531         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
532         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
533
534         /*
535          * make sure volgroup parent partition path is kept coherent
536          *
537          * If we ever want to support non-COW clones on a machine holding
538          * the RW site, please note that this code does not work under the
539          * conditions where someone zaps a COW clone on partition X, and
540          * subsequently creates a full clone on partition Y -- we'd need
541          * an inverse to SALVSYNC_com_Link.
542          *  -- tkeiser 11/28/2007
543          */
544         strcpy(node->command.sop.partName, com->sop->partName);
545
546         if (AddToSalvageQueue(node)) {
547             code = SYNC_DENIED;
548         }
549         break;
550
551     default:
552         break;
553     }
554
555     if (hash) {
556         AddNodeToHash(node);
557     }
558
559     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
560     res->sop->state = node->state;
561     res->sop->prio = node->command.sop.prio;
562
563  done:
564     return code;
565 }
566
567 /**
568  * cancel a pending salvage request.
569  *
570  * @param[in]  com  inbound command object
571  * @param[out] res  outbound response object
572  *
573  * @return operation status
574  *    @retval SYNC_OK success
575  *    @retval SYNC_FAILED malformed command packet
576  *
577  * @note this is a SALVSYNC protocol rpc handler
578  *
579  * @internal
580  */
581 static afs_int32
582 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
583 {
584     afs_int32 code = SYNC_OK;
585     struct SalvageQueueNode * node;
586
587     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
588         code = SYNC_FAILED;
589         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
590         goto done;
591     }
592
593     node = LookupNodeByCommand(com->sop, NULL);
594
595     if (node == NULL) {
596         res->sop->state = SALVSYNC_STATE_UNKNOWN;
597         res->sop->prio = 0;
598     } else {
599         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
600         res->sop->prio = node->command.sop.prio;
601         res->sop->state = node->state;
602         if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
603             (node->state == SALVSYNC_STATE_QUEUED)) {
604             DeleteFromSalvageQueue(node);
605         }
606     }
607
608  done:
609     return code;
610 }
611
612 /**
613  * cancel all pending salvage requests.
614  *
615  * @param[in]  com  incoming command object
616  * @param[out] res  outbound response object
617  *
618  * @return operation status
619  *    @retval SYNC_OK success
620  *
621  * @note this is a SALVSYNC protocol rpc handler
622  *
623  * @internal
624  */
625 static afs_int32
626 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
627 {
628     struct SalvageQueueNode * np, *nnp;
629     struct DiskPartition64 * dp;
630
631     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
632         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
633             DeleteFromSalvageQueue(np);
634         }
635     }
636
637     return SYNC_OK;
638 }
639
640 /**
641  * link a queue node for a clone to its parent volume.
642  *
643  * @param[in]  com   inbound command object
644  * @param[out] res   outbound response object
645  *
646  * @return operation status
647  *    @retval SYNC_OK success
648  *    @retval SYNC_FAILED malformed command packet
649  *    @retval SYNC_DENIED the request could not be completed
650  *
651  * @note this is a SALVSYNC protocol rpc handler
652  *
653  * @post the requested volume is marked as a child of another volume.
654  *       thus, future salvage requests for this volume will result in the
655  *       parent of the volume group being scheduled for salvage instead
656  *       of this clone.
657  *
658  * @internal
659  */
660 static afs_int32
661 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
662 {
663     afs_int32 code = SYNC_OK;
664     struct SalvageQueueNode * clone, * parent;
665
666     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
667         code = SYNC_FAILED;
668         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
669         goto done;
670     }
671
672     /* lookup clone's salvage scheduling node */
673     clone = LookupNodeByCommand(com->sop, NULL);
674     if (clone == NULL) {
675         code = SYNC_DENIED;
676         res->hdr->reason = SALVSYNC_REASON_ERROR;
677         goto done;
678     }
679
680     /* lookup parent's salvage scheduling node */
681     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
682     if (parent == NULL) {
683         if (AllocNode(&parent)) {
684             code = SYNC_DENIED;
685             res->hdr->reason = SYNC_REASON_NOMEM;
686             goto done;
687         }
688         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
689         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
690         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
691         AddNodeToHash(parent);
692     }
693
694     if (LinkNode(parent, clone)) {
695         code = SYNC_DENIED;
696         goto done;
697     }
698
699  done:
700     return code;
701 }
702
703 /**
704  * query the status of a volume salvage request.
705  *
706  * @param[in]  com   inbound command object
707  * @param[out] res   outbound response object
708  *
709  * @return operation status
710  *    @retval SYNC_OK success
711  *    @retval SYNC_FAILED malformed command packet
712  *
713  * @note this is a SALVSYNC protocol rpc handler
714  *
715  * @internal
716  */
717 static afs_int32
718 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
719 {
720     afs_int32 code = SYNC_OK;
721     struct SalvageQueueNode * node;
722
723     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
724         code = SYNC_FAILED;
725         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
726         goto done;
727     }
728
729     LookupNodeByCommand(com->sop, &node);
730
731     /* query whether a volume is done salvaging */
732     if (node == NULL) {
733         res->sop->state = SALVSYNC_STATE_UNKNOWN;
734         res->sop->prio = 0;
735     } else {
736         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
737         res->sop->state = node->state;
738         res->sop->prio = node->command.sop.prio;
739     }
740
741  done:
742     return code;
743 }
744
745 static void
746 SALVSYNC_Drop(osi_socket fd)
747 {
748     RemoveHandler(fd);
749     rk_closesocket(fd);
750     AcceptOn();
751 }
752
753 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
754
755 static void
756 AcceptOn(void)
757 {
758     if (AcceptHandler == -1) {
759         osi_Assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
760         AcceptHandler = FindHandler(salvsync_server_state.fd);
761     }
762 }
763
764 static void
765 AcceptOff(void)
766 {
767     if (AcceptHandler != -1) {
768         osi_Assert(RemoveHandler(salvsync_server_state.fd));
769         AcceptHandler = -1;
770     }
771 }
772
773 /* The multiple FD handling code. */
774
775 static void
776 InitHandler(void)
777 {
778     int i;
779     ObtainWriteLock(&SALVSYNC_handler_lock);
780     for (i = 0; i < MAXHANDLERS; i++) {
781         HandlerFD[i] = OSI_NULLSOCKET;
782         HandlerProc[i] = NULL;
783     }
784     ReleaseWriteLock(&SALVSYNC_handler_lock);
785 }
786
787 static void
788 CallHandler(fd_set * fdsetp)
789 {
790     int i;
791     ObtainReadLock(&SALVSYNC_handler_lock);
792     for (i = 0; i < MAXHANDLERS; i++) {
793         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
794             ReleaseReadLock(&SALVSYNC_handler_lock);
795             (*HandlerProc[i]) (HandlerFD[i]);
796             ObtainReadLock(&SALVSYNC_handler_lock);
797         }
798     }
799     ReleaseReadLock(&SALVSYNC_handler_lock);
800 }
801
802 static int
803 AddHandler(osi_socket afd, void (*aproc) (int))
804 {
805     int i;
806     ObtainWriteLock(&SALVSYNC_handler_lock);
807     for (i = 0; i < MAXHANDLERS; i++)
808         if (HandlerFD[i] == OSI_NULLSOCKET)
809             break;
810     if (i >= MAXHANDLERS) {
811         ReleaseWriteLock(&SALVSYNC_handler_lock);
812         return 0;
813     }
814     HandlerFD[i] = afd;
815     HandlerProc[i] = aproc;
816     ReleaseWriteLock(&SALVSYNC_handler_lock);
817     return 1;
818 }
819
820 static int
821 FindHandler(osi_socket afd)
822 {
823     int i;
824     ObtainReadLock(&SALVSYNC_handler_lock);
825     for (i = 0; i < MAXHANDLERS; i++)
826         if (HandlerFD[i] == afd) {
827             ReleaseReadLock(&SALVSYNC_handler_lock);
828             return i;
829         }
830     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
831     osi_Panic("Failed to find handler\n");
832     return -1;                  /* satisfy compiler */
833 }
834
835 static int
836 FindHandler_r(osi_socket afd)
837 {
838     int i;
839     for (i = 0; i < MAXHANDLERS; i++)
840         if (HandlerFD[i] == afd) {
841             return i;
842         }
843     osi_Panic("Failed to find handler\n");
844     return -1;                  /* satisfy compiler */
845 }
846
847 static int
848 RemoveHandler(osi_socket afd)
849 {
850     ObtainWriteLock(&SALVSYNC_handler_lock);
851     HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
852     ReleaseWriteLock(&SALVSYNC_handler_lock);
853     return 1;
854 }
855
856 static void
857 GetHandler(fd_set * fdsetp, int *maxfdp)
858 {
859     int i;
860     int maxfd = -1;
861     FD_ZERO(fdsetp);
862     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
863     for (i = 0; i < MAXHANDLERS; i++)
864         if (HandlerFD[i] != OSI_NULLSOCKET) {
865             FD_SET(HandlerFD[i], fdsetp);
866 #ifndef AFS_NT40_ENV
867             /* On Windows the nfds parameter to select() is ignored */
868             if (maxfd < HandlerFD[i] || maxfd == (int)-1)
869                 maxfd = HandlerFD[i];
870 #endif
871         }
872     *maxfdp = maxfd;
873     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
874 }
875
876 /**
877  * allocate a salvage queue node.
878  *
879  * @param[out] node_out  address in which to store new node pointer
880  *
881  * @return operation status
882  *    @retval 0 success
883  *    @retval 1 failed to allocate node
884  *
885  * @internal
886  */
887 static int
888 AllocNode(struct SalvageQueueNode ** node_out)
889 {
890     int code = 0;
891     struct SalvageQueueNode * node;
892
893     *node_out = node = calloc(1, sizeof(struct SalvageQueueNode));
894     if (node == NULL) {
895         code = 1;
896         goto done;
897     }
898
899     node->type = SALVSYNC_VOLGROUP_PARENT;
900     node->state = SALVSYNC_STATE_UNKNOWN;
901
902  done:
903     return code;
904 }
905
906 /**
907  * link a salvage queue node to its parent.
908  *
909  * @param[in] parent  pointer to queue node for parent of volume group
910  * @param[in] clone   pointer to queue node for a clone
911  *
912  * @return operation status
913  *    @retval 0 success
914  *    @retval 1 failure
915  *
916  * @internal
917  */
918 static int
919 LinkNode(struct SalvageQueueNode * parent,
920          struct SalvageQueueNode * clone)
921 {
922     int code = 0;
923     int idx;
924
925     /* check for attaching a clone to a clone */
926     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
927         code = 1;
928         goto done;
929     }
930
931     /* check for pre-existing registration and openings */
932     for (idx = 0; idx < VOLMAXTYPES; idx++) {
933         if (parent->volgroup.children[idx] == clone) {
934             goto linked;
935         }
936         if (parent->volgroup.children[idx] == NULL) {
937             break;
938         }
939     }
940     if (idx == VOLMAXTYPES) {
941         code = 1;
942         goto done;
943     }
944
945     /* link parent and child */
946     parent->volgroup.children[idx] = clone;
947     clone->type = SALVSYNC_VOLGROUP_CLONE;
948     clone->volgroup.parent = parent;
949
950
951  linked:
952     switch (clone->state) {
953     case SALVSYNC_STATE_QUEUED:
954         DeleteFromSalvageQueue(clone);
955
956     case SALVSYNC_STATE_SALVAGING:
957         switch (parent->state) {
958         case SALVSYNC_STATE_UNKNOWN:
959         case SALVSYNC_STATE_ERROR:
960         case SALVSYNC_STATE_DONE:
961             parent->command.sop.prio = clone->command.sop.prio;
962             AddToSalvageQueue(parent);
963             break;
964
965         case SALVSYNC_STATE_QUEUED:
966             if (clone->command.sop.prio) {
967                 parent->command.sop.prio += clone->command.sop.prio;
968                 UpdateCommandPrio(parent);
969             }
970             break;
971
972         default:
973             break;
974         }
975         break;
976
977     default:
978         break;
979     }
980
981  done:
982     return code;
983 }
984
985 static void
986 HandlePrio(struct SalvageQueueNode * clone,
987            struct SalvageQueueNode * node,
988            afs_uint32 new_prio)
989 {
990     afs_uint32 delta;
991
992     switch (node->state) {
993     case SALVSYNC_STATE_ERROR:
994     case SALVSYNC_STATE_DONE:
995     case SALVSYNC_STATE_UNKNOWN:
996         node->command.sop.prio = 0;
997         break;
998     default:
999         break;
1000     }
1001
1002     if (new_prio < clone->command.sop.prio) {
1003         /* strange. let's just set our delta to 1 */
1004         delta = 1;
1005     } else {
1006         delta = new_prio - clone->command.sop.prio;
1007     }
1008
1009     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1010         clone->command.sop.prio = new_prio;
1011     }
1012
1013     node->command.sop.prio += delta;
1014 }
1015
1016 static int
1017 AddToSalvageQueue(struct SalvageQueueNode * node)
1018 {
1019     afs_int32 id;
1020     struct SalvageQueueNode * last = NULL;
1021
1022     id = volutil_GetPartitionID(node->command.sop.partName);
1023     if (id < 0 || id > VOLMAXPARTS) {
1024         return 1;
1025     }
1026     if (!VGetPartitionById_r(id, 0)) {
1027         /* don't enqueue salvage requests for unmounted partitions */
1028         return 1;
1029     }
1030     if (queue_IsOnQueue(node)) {
1031         return 0;
1032     }
1033
1034     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1035         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1036     }
1037     queue_Append(&salvageQueue.part[id], node);
1038     salvageQueue.len[id]++;
1039     salvageQueue.total_len++;
1040     salvageQueue.last_insert = id;
1041     node->partition_id = id;
1042     node->state = SALVSYNC_STATE_QUEUED;
1043
1044     /* reorder, if necessary */
1045     if (last && last->command.sop.prio < node->command.sop.prio) {
1046         UpdateCommandPrio(node);
1047     }
1048
1049     CV_BROADCAST(&salvageQueue.cv);
1050     return 0;
1051 }
1052
1053 static void
1054 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1055 {
1056     if (queue_IsOnQueue(node)) {
1057         queue_Remove(node);
1058         salvageQueue.len[node->partition_id]--;
1059         salvageQueue.total_len--;
1060         node->state = SALVSYNC_STATE_UNKNOWN;
1061         CV_BROADCAST(&salvageQueue.cv);
1062     }
1063 }
1064
1065 static void
1066 AddToPendingQueue(struct SalvageQueueNode * node)
1067 {
1068     queue_Append(&pendingQueue, node);
1069     pendingQueue.len++;
1070     node->state = SALVSYNC_STATE_SALVAGING;
1071     CV_BROADCAST(&pendingQueue.queue_change_cv);
1072 }
1073
1074 static void
1075 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1076 {
1077     if (queue_IsOnQueue(node)) {
1078         queue_Remove(node);
1079         pendingQueue.len--;
1080         node->state = SALVSYNC_STATE_UNKNOWN;
1081         CV_BROADCAST(&pendingQueue.queue_change_cv);
1082     }
1083 }
1084
1085 #if 0
1086 static struct SalvageQueueNode *
1087 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1088 {
1089     struct SalvageQueueNode * np, * nnp;
1090
1091     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1092         if ((np->command.sop.volume == qry->volume) &&
1093             !strncmp(np->command.sop.partName, qry->partName,
1094                      sizeof(qry->partName)))
1095             break;
1096     }
1097
1098     if (queue_IsEnd(&pendingQueue, np))
1099         np = NULL;
1100     return np;
1101 }
1102 #endif
1103
1104 static struct SalvageQueueNode *
1105 LookupPendingCommandByPid(int pid)
1106 {
1107     struct SalvageQueueNode * np, * nnp;
1108
1109     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1110         if (np->pid == pid)
1111             break;
1112     }
1113
1114     if (queue_IsEnd(&pendingQueue, np))
1115         np = NULL;
1116     return np;
1117 }
1118
1119
1120 /* raise the priority of a previously scheduled salvage */
1121 static void
1122 UpdateCommandPrio(struct SalvageQueueNode * node)
1123 {
1124     struct SalvageQueueNode *np, *nnp;
1125     afs_int32 id;
1126     afs_uint32 prio;
1127
1128     osi_Assert(queue_IsOnQueue(node));
1129
1130     prio = node->command.sop.prio;
1131     id = node->partition_id;
1132     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1133         queue_Remove(node);
1134         queue_Prepend(&salvageQueue.part[id], node);
1135     } else {
1136         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1137             if (np->command.sop.prio > prio)
1138                 break;
1139         }
1140         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1141             queue_Remove(node);
1142             queue_Prepend(&salvageQueue.part[id], node);
1143         } else if (node != np) {
1144             queue_Remove(node);
1145             queue_InsertAfter(np, node);
1146         }
1147     }
1148 }
1149
1150 /* this will need to be rearchitected if we ever want more than one thread
1151  * to wait for new salvage nodes */
1152 struct SalvageQueueNode *
1153 SALVSYNC_getWork(void)
1154 {
1155     int i;
1156     struct DiskPartition64 * dp = NULL, * fdp;
1157     static afs_int32 next_part_sched = 0;
1158     struct SalvageQueueNode *node = NULL;
1159
1160     VOL_LOCK;
1161
1162     /*
1163      * wait for work to be scheduled
1164      * if there are no disk partitions, just sit in this wait loop forever
1165      */
1166     while (!salvageQueue.total_len || !DiskPartitionList) {
1167         VOL_CV_WAIT(&salvageQueue.cv);
1168     }
1169
1170     /*
1171      * short circuit for simple case where only one partition has
1172      * scheduled salvages
1173      */
1174     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1175         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1176         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1177         goto have_node;
1178     }
1179
1180
1181     /*
1182      * ok, more than one partition has scheduled salvages.
1183      * now search for partitions with scheduled salvages, but no pending salvages.
1184      */
1185     dp = VGetPartitionById_r(next_part_sched, 0);
1186     if (!dp) {
1187         dp = DiskPartitionList;
1188     }
1189     fdp = dp;
1190
1191     for (i=0 ;
1192          !i || dp != fdp ;
1193          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1194         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1195             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1196             goto have_node;
1197         }
1198     }
1199
1200
1201     /*
1202      * all partitions with scheduled salvages have at least one pending.
1203      * now do an exhaustive search for a scheduled salvage.
1204      */
1205     dp = fdp;
1206
1207     for (i=0 ;
1208          !i || dp != fdp ;
1209          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1210         if (salvageQueue.len[dp->index]) {
1211             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1212             goto have_node;
1213         }
1214     }
1215
1216     /* we should never reach this line */
1217     osi_Panic("Node not found\n");
1218
1219  have_node:
1220     osi_Assert(node != NULL);
1221     node->pid = 0;
1222     partition_salvaging[node->partition_id]++;
1223     DeleteFromSalvageQueue(node);
1224     AddToPendingQueue(node);
1225
1226     if (dp) {
1227         /* update next_part_sched field */
1228         if (dp->next) {
1229             next_part_sched = dp->next->index;
1230         } else if (DiskPartitionList) {
1231             next_part_sched = DiskPartitionList->index;
1232         } else {
1233             next_part_sched = -1;
1234         }
1235     }
1236
1237     VOL_UNLOCK;
1238     return node;
1239 }
1240
1241 /**
1242  * update internal scheduler state to reflect completion of a work unit.
1243  *
1244  * @param[in]  node    salvage queue node object pointer
1245  * @param[in]  result  worker process result code
1246  *
1247  * @post scheduler state is updated.
1248  *
1249  * @internal
1250  */
1251 static void
1252 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1253 {
1254     afs_int32 partid;
1255     int idx;
1256
1257     DeleteFromPendingQueue(node);
1258     partid = node->partition_id;
1259     if (partid >=0 && partid <= VOLMAXPARTS) {
1260         partition_salvaging[partid]--;
1261     }
1262     if (result == 0) {
1263         node->state = SALVSYNC_STATE_DONE;
1264     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1265         node->state = SALVSYNC_STATE_ERROR;
1266     }
1267
1268     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1269         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1270             if (node->volgroup.children[idx]) {
1271                 node->volgroup.children[idx]->state = node->state;
1272             }
1273         }
1274     }
1275 }
1276
1277 /**
1278  * check whether worker child failed.
1279  *
1280  * @param[in] status  status bitfield return by wait()
1281  *
1282  * @return boolean failure code
1283  *    @retval 0 child succeeded
1284  *    @retval 1 child failed
1285  *
1286  * @internal
1287  */
1288 static int
1289 ChildFailed(int status)
1290 {
1291     return (WCOREDUMP(status) ||
1292             WIFSIGNALED(status) ||
1293             ((WEXITSTATUS(status) != 0) &&
1294              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1295 }
1296
1297
1298 /**
1299  * notify salvsync scheduler of node completion, by child pid.
1300  *
1301  * @param[in]  pid     pid of worker child
1302  * @param[in]  status  worker status bitfield from wait()
1303  *
1304  * @post scheduler state is updated.
1305  *       if status code is a failure, fileserver notification was attempted
1306  *
1307  * @see SALVSYNC_doneWork_r
1308  */
1309 void
1310 SALVSYNC_doneWorkByPid(int pid, int status)
1311 {
1312     struct SalvageQueueNode * node;
1313     char partName[16];
1314     afs_uint32 volids[VOLMAXTYPES+1];
1315     unsigned int idx;
1316
1317     memset(volids, 0, sizeof(volids));
1318
1319     VOL_LOCK;
1320     node = LookupPendingCommandByPid(pid);
1321     if (node != NULL) {
1322         SALVSYNC_doneWork_r(node, status);
1323
1324         if (ChildFailed(status)) {
1325             /* populate volume id list for later processing outside the glock */
1326             volids[0] = node->command.sop.volume;
1327             strcpy(partName, node->command.sop.partName);
1328             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1329                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1330                     if (node->volgroup.children[idx]) {
1331                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1332                     }
1333                 }
1334             }
1335         }
1336     }
1337     VOL_UNLOCK;
1338
1339     /*
1340      * if necessary, notify fileserver of
1341      * failure to salvage volume group
1342      * [we cannot guarantee that the child made the
1343      *  appropriate notifications (e.g. SIGSEGV)]
1344      *  -- tkeiser 11/28/2007
1345      */
1346     if (ChildFailed(status)) {
1347         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1348             if (volids[idx]) {
1349                 FSYNC_VolOp(volids[idx],
1350                             partName,
1351                             FSYNC_VOL_FORCE_ERROR,
1352                             FSYNC_WHATEVER,
1353                             NULL);
1354             }
1355         }
1356     }
1357 }
1358
1359 #endif /* AFS_DEMAND_ATTACH_FS */