volscan: avoid printing null mount-point cellname
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29 #include <afs/procmgmt.h>
30 #include <roken.h>
31
32 #include <stddef.h>
33
34 #include <afs/opr.h>
35 #include <opr/lock.h>
36 #include <afs/afsint.h>
37 #include <rx/rx_queue.h>
38
39 #include "nfs.h"
40 #include <afs/errors.h>
41 #include "salvsync.h"
42 #include "lock.h"
43 #include <afs/afssyscalls.h>
44 #include "ihandle.h"
45 #include "vnode.h"
46 #include "volume.h"
47 #include "partition.h"
48 #include "common.h"
49 #include <rx/rx_queue.h>
50
51 #ifdef USE_UNIX_SOCKETS
52 #include <afs/afsutil.h>
53 #include <sys/un.h>
54 #endif
55
56 #ifndef WCOREDUMP
57 #define WCOREDUMP(x)    ((x) & 0200)
58 #endif
59
60 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
61                                  * move = dump+restore can run on single server */
62
63
64 /*
65  * This lock controls access to the handler array.
66  */
67 struct Lock SALVSYNC_handler_lock;
68
69
70 #ifdef AFS_DEMAND_ATTACH_FS
71 /*
72  * SALVSYNC is a feature specific to the demand attach fileserver
73  */
74
75 /* Forward declarations */
76 static void * SALVSYNC_syncThread(void *);
77 static void SALVSYNC_newconnection(osi_socket fd);
78 static void SALVSYNC_com(osi_socket fd);
79 static void SALVSYNC_Drop(osi_socket fd);
80 static void AcceptOn(void);
81 static void AcceptOff(void);
82 static void InitHandler(void);
83 static void CallHandler(fd_set * fdsetp);
84 static int AddHandler(osi_socket afd, void (*aproc) (int));
85 static int FindHandler(osi_socket afd);
86 static int FindHandler_r(osi_socket afd);
87 static int RemoveHandler(osi_socket afd);
88 static void GetHandler(fd_set * fdsetp, int *maxfdp);
89
90 static int AllocNode(struct SalvageQueueNode ** node);
91
92 static int AddToSalvageQueue(struct SalvageQueueNode * node);
93 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
94 static void AddToPendingQueue(struct SalvageQueueNode * node);
95 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
96 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
97 static void UpdateCommandPrio(struct SalvageQueueNode * node);
98 static void HandlePrio(struct SalvageQueueNode * clone,
99                        struct SalvageQueueNode * parent,
100                        afs_uint32 new_prio);
101
102 static int LinkNode(struct SalvageQueueNode * parent,
103                     struct SalvageQueueNode * clone);
104
105 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
106                                             struct SalvageQueueNode ** parent);
107 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
108                                                      struct SalvageQueueNode ** parent);
109 static void AddNodeToHash(struct SalvageQueueNode * node);
110
111 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
112 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
113 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
114 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
115 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
116
117
118 extern int LogLevel;
119 extern int VInit;
120 extern pthread_mutex_t vol_salvsync_mutex;
121
122 /**
123  * salvsync server socket handle.
124  */
125 static SYNC_server_state_t salvsync_server_state =
126     { OSI_NULLSOCKET,                       /* file descriptor */
127       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
128       SALVSYNC_PROTO_VERSION,   /* protocol version */
129       5,                        /* bind() retry limit */
130       100,                      /* listen() queue depth */
131       "SALVSYNC",               /* protocol name string */
132     };
133
134
135 /**
136  * queue of all volumes waiting to be salvaged.
137  */
138 struct SalvageQueue {
139     volatile int total_len;
140     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
141     volatile int len[VOLMAXPARTS+1];
142     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
143     pthread_cond_t cv;
144 };
145 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
146
147 /**
148  * queue of all volumes currently being salvaged.
149  */
150 struct QueueHead {
151     volatile struct rx_queue q;  /**< queue of salvages in progress */
152     volatile int len;            /**< length of in-progress queue */
153     pthread_cond_t queue_change_cv;
154 };
155 static struct QueueHead pendingQueue;  /* volumes being salvaged */
156
157 /* XXX
158  * whether a partition has a salvage in progress
159  *
160  * the salvager code only permits one salvage per partition at a time
161  *
162  * the following hack tries to keep salvaged parallelism high by
163  * only permitting one salvage dispatch per partition at a time
164  *
165  * unfortunately, the parallel salvager currently
166  * has a rather braindead routine that won't permit
167  * multiple salvages on the same "device".  this
168  * function happens to break pretty badly on lvm, raid luns, etc.
169  *
170  * this hack isn't good enough to stop the device limiting code from
171  * crippling performance.  someday that code needs to be rewritten
172  */
173 static int partition_salvaging[VOLMAXPARTS+1];
174
175 static int HandlerFD[MAXHANDLERS];
176 static void (*HandlerProc[MAXHANDLERS]) (int);
177
178 #define VSHASH_SIZE 64
179 #define VSHASH_MASK (VSHASH_SIZE-1)
180 #define VSHASH(vid) ((vid)&VSHASH_MASK)
181
182 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
183
184 static struct SalvageQueueNode *
185 LookupNode(VolumeId vid, char * partName,
186            struct SalvageQueueNode ** parent)
187 {
188     struct rx_queue *qp, *nqp;
189     struct SalvageQueueNode *vsp = NULL;
190     int idx = VSHASH(vid);
191
192     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
193         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
194         if ((vsp->command.sop.volume == vid) &&
195             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
196             break;
197         }
198     }
199
200     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
201         vsp = NULL;
202     }
203
204     if (parent) {
205         if (vsp) {
206             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
207                 vsp->volgroup.parent : vsp;
208         } else {
209             *parent = NULL;
210         }
211     }
212
213     return vsp;
214 }
215
216 static struct SalvageQueueNode *
217 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
218                     struct SalvageQueueNode ** parent)
219 {
220     return LookupNode(qry->volume, qry->partName, parent);
221 }
222
223 static void
224 AddNodeToHash(struct SalvageQueueNode * node)
225 {
226     int idx = VSHASH(node->command.sop.volume);
227
228     if (queue_IsOnQueue(&node->hash_chain)) {
229         return;
230     }
231
232     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
233     SalvageHashTable[idx].len++;
234 }
235
236 #if 0
237 static void
238 DeleteNodeFromHash(struct SalvageQueueNode * node)
239 {
240     int idx = VSHASH(node->command.sop.volume);
241
242     if (queue_IsNotOnQueue(&node->hash_chain)) {
243         return;
244     }
245
246     queue_Remove(&node->hash_chain);
247     SalvageHashTable[idx].len--;
248 }
249 #endif
250
251 void
252 SALVSYNC_salvInit(void)
253 {
254     int i;
255     pthread_t tid;
256     pthread_attr_t tattr;
257
258     /* initialize the queues */
259     Lock_Init(&SALVSYNC_handler_lock);
260     CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
261     for (i = 0; i <= VOLMAXPARTS; i++) {
262         queue_Init(&salvageQueue.part[i]);
263         salvageQueue.len[i] = 0;
264     }
265     CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
266     queue_Init(&pendingQueue);
267     salvageQueue.total_len = pendingQueue.len = 0;
268     salvageQueue.last_insert = -1;
269     memset(partition_salvaging, 0, sizeof(partition_salvaging));
270
271     for (i = 0; i < VSHASH_SIZE; i++) {
272         CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
273         SalvageHashTable[i].len = 0;
274         queue_Init(&SalvageHashTable[i]);
275     }
276
277     /* start the salvsync thread */
278     opr_Verify(pthread_attr_init(&tattr) == 0);
279     opr_Verify(pthread_attr_setdetachstate(&tattr,
280                                            PTHREAD_CREATE_DETACHED) == 0);
281     opr_Verify(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
282 }
283
284 static void
285 CleanFDs(void)
286 {
287     int i;
288     for (i = 0; i < MAXHANDLERS; ++i) {
289         if (HandlerFD[i] >= 0) {
290             SALVSYNC_Drop(HandlerFD[i]);
291         }
292     }
293
294     /* just in case we were in AcceptOff mode, and thus this fd wouldn't
295      * have a handler */
296     close(salvsync_server_state.fd);
297     salvsync_server_state.fd = OSI_NULLSOCKET;
298 }
299
300 static fd_set SALVSYNC_readfds;
301
302 static void *
303 SALVSYNC_syncThread(void * args)
304 {
305     int code;
306     SYNC_server_state_t * state = &salvsync_server_state;
307
308     /* when we fork, the child needs to close the salvsync server sockets,
309      * otherwise, it may get salvsync requests, instead of the parent
310      * salvageserver */
311     opr_Verify(pthread_atfork(NULL, NULL, CleanFDs) == 0);
312
313     SYNC_getAddr(&state->endpoint, &state->addr);
314     SYNC_cleanupSock(state);
315
316 #ifndef AFS_NT40_ENV
317     (void)signal(SIGPIPE, SIG_IGN);
318 #endif
319
320     state->fd = SYNC_getSock(&state->endpoint);
321     code = SYNC_bindSock(state);
322     opr_Assert(!code);
323
324     InitHandler();
325     AcceptOn();
326
327     for (;;) {
328         int maxfd;
329         struct timeval s_timeout;
330         GetHandler(&SALVSYNC_readfds, &maxfd);
331         s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
332         s_timeout.tv_usec = 0;
333         /* Note: check for >= 1 below is essential since IOMGR_select
334          * doesn't have exactly same semantics as select.
335          */
336         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
337             CallHandler(&SALVSYNC_readfds);
338     }
339
340     return NULL;
341 }
342
343 static void
344 SALVSYNC_newconnection(int afd)
345 {
346 #ifdef USE_UNIX_SOCKETS
347     struct sockaddr_un other;
348 #else  /* USE_UNIX_SOCKETS */
349     struct sockaddr_in other;
350 #endif
351     int fd;
352     socklen_t junk;
353
354     junk = sizeof(other);
355     fd = accept(afd, (struct sockaddr *)&other, &junk);
356     if (fd == OSI_NULLSOCKET) {
357         osi_Panic("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
358     } else if (!AddHandler(fd, SALVSYNC_com)) {
359         AcceptOff();
360         opr_Verify(AddHandler(fd, SALVSYNC_com));
361     }
362 }
363
364 /* this function processes commands from an salvsync file descriptor (fd) */
365 static afs_int32 SALV_cnt = 0;
366 static void
367 SALVSYNC_com(osi_socket fd)
368 {
369     SYNC_command com;
370     SYNC_response res;
371     SALVSYNC_response_hdr sres_hdr;
372     SALVSYNC_command scom;
373     SALVSYNC_response sres;
374     SYNC_PROTO_BUF_DECL(buf);
375
376     memset(&com, 0, sizeof(com));
377     memset(&res, 0, sizeof(res));
378     memset(&scom, 0, sizeof(scom));
379     memset(&sres, 0, sizeof(sres));
380     memset(&sres_hdr, 0, sizeof(sres_hdr));
381
382     com.payload.buf = (void *)buf;
383     com.payload.len = SYNC_PROTO_MAX_LEN;
384     res.payload.buf = (void *) &sres_hdr;
385     res.payload.len = sizeof(sres_hdr);
386     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
387     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
388
389     scom.hdr = &com.hdr;
390     scom.sop = (SALVSYNC_command_hdr *) buf;
391     scom.com = &com;
392     sres.hdr = &res.hdr;
393     sres.sop = &sres_hdr;
394     sres.res = &res;
395
396     SALV_cnt++;
397     if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
398         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
399         SALVSYNC_Drop(fd);
400         return;
401     }
402
403     if (com.recv_len < sizeof(com.hdr)) {
404         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
405         res.hdr.response = SYNC_COM_ERROR;
406         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
407         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
408         goto respond;
409     }
410
411     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
412         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
413         res.hdr.response = SYNC_COM_ERROR;
414         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
415         goto respond;
416     }
417
418     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
419         res.hdr.response = SYNC_OK;
420         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
421
422         /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
423          * never wait for a response. */
424         goto done;
425     }
426
427     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
428         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
429         res.hdr.response = SYNC_COM_ERROR;
430         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
431         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
432         goto respond;
433     }
434
435     res.hdr.com_seq = com.hdr.com_seq;
436
437     VOL_LOCK;
438     switch (com.hdr.command) {
439     case SALVSYNC_NOP:
440         break;
441     case SALVSYNC_SALVAGE:
442     case SALVSYNC_RAISEPRIO:
443         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
444         break;
445     case SALVSYNC_CANCEL:
446         /* cancel a salvage */
447         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
448         break;
449     case SALVSYNC_CANCELALL:
450         /* cancel all queued salvages */
451         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
452         break;
453     case SALVSYNC_QUERY:
454         /* query whether a volume is done salvaging */
455         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
456         break;
457     case SALVSYNC_OP_LINK:
458         /* link a clone to its parent in the scheduler */
459         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
460         break;
461     default:
462         res.hdr.response = SYNC_BAD_COMMAND;
463         break;
464     }
465
466     sres_hdr.sq_len = salvageQueue.total_len;
467     sres_hdr.pq_len = pendingQueue.len;
468     VOL_UNLOCK;
469
470  respond:
471     SYNC_putRes(&salvsync_server_state, fd, &res);
472
473  done:
474     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
475         SALVSYNC_Drop(fd);
476     }
477 }
478
479 /**
480  * request that a volume be salvaged.
481  *
482  * @param[in]  com  inbound command object
483  * @param[out] res  outbound response object
484  *
485  * @return operation status
486  *    @retval SYNC_OK success
487  *    @retval SYNC_DENIED failed to enqueue request
488  *    @retval SYNC_FAILED malformed command packet
489  *
490  * @note this is a SALVSYNC protocol rpc handler
491  *
492  * @internal
493  *
494  * @post the volume is enqueued in the to-be-salvaged queue.
495  *       if the volume was already in the salvage queue, its
496  *       priority (and thus its location in the queue) are
497  *       updated.
498  */
499 static afs_int32
500 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
501 {
502     afs_int32 code = SYNC_OK;
503     struct SalvageQueueNode * node, * clone;
504     int hash = 0;
505
506     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
507         code = SYNC_FAILED;
508         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
509         goto done;
510     }
511
512     clone = LookupNodeByCommand(com->sop, &node);
513
514     if (node == NULL) {
515         if (AllocNode(&node)) {
516             code = SYNC_DENIED;
517             res->hdr->reason = SYNC_REASON_NOMEM;
518             goto done;
519         }
520         clone = node;
521         hash = 1;
522     }
523
524     HandlePrio(clone, node, com->sop->prio);
525
526     switch (node->state) {
527     case SALVSYNC_STATE_QUEUED:
528         UpdateCommandPrio(node);
529         break;
530
531     case SALVSYNC_STATE_ERROR:
532     case SALVSYNC_STATE_DONE:
533     case SALVSYNC_STATE_UNKNOWN:
534         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
535         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
536
537         /*
538          * make sure volgroup parent partition path is kept coherent
539          *
540          * If we ever want to support non-COW clones on a machine holding
541          * the RW site, please note that this code does not work under the
542          * conditions where someone zaps a COW clone on partition X, and
543          * subsequently creates a full clone on partition Y -- we'd need
544          * an inverse to SALVSYNC_com_Link.
545          *  -- tkeiser 11/28/2007
546          */
547         strcpy(node->command.sop.partName, com->sop->partName);
548
549         if (AddToSalvageQueue(node)) {
550             code = SYNC_DENIED;
551         }
552         break;
553
554     default:
555         break;
556     }
557
558     if (hash) {
559         AddNodeToHash(node);
560     }
561
562     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
563     res->sop->state = node->state;
564     res->sop->prio = node->command.sop.prio;
565
566  done:
567     return code;
568 }
569
570 /**
571  * cancel a pending salvage request.
572  *
573  * @param[in]  com  inbound command object
574  * @param[out] res  outbound response object
575  *
576  * @return operation status
577  *    @retval SYNC_OK success
578  *    @retval SYNC_FAILED malformed command packet
579  *
580  * @note this is a SALVSYNC protocol rpc handler
581  *
582  * @internal
583  */
584 static afs_int32
585 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
586 {
587     afs_int32 code = SYNC_OK;
588     struct SalvageQueueNode * node;
589
590     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
591         code = SYNC_FAILED;
592         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
593         goto done;
594     }
595
596     node = LookupNodeByCommand(com->sop, NULL);
597
598     if (node == NULL) {
599         res->sop->state = SALVSYNC_STATE_UNKNOWN;
600         res->sop->prio = 0;
601     } else {
602         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
603         res->sop->prio = node->command.sop.prio;
604         res->sop->state = node->state;
605         if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
606             (node->state == SALVSYNC_STATE_QUEUED)) {
607             DeleteFromSalvageQueue(node);
608         }
609     }
610
611  done:
612     return code;
613 }
614
615 /**
616  * cancel all pending salvage requests.
617  *
618  * @param[in]  com  incoming command object
619  * @param[out] res  outbound response object
620  *
621  * @return operation status
622  *    @retval SYNC_OK success
623  *
624  * @note this is a SALVSYNC protocol rpc handler
625  *
626  * @internal
627  */
628 static afs_int32
629 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
630 {
631     struct SalvageQueueNode * np, *nnp;
632     struct DiskPartition64 * dp;
633
634     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
635         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
636             DeleteFromSalvageQueue(np);
637         }
638     }
639
640     return SYNC_OK;
641 }
642
643 /**
644  * link a queue node for a clone to its parent volume.
645  *
646  * @param[in]  com   inbound command object
647  * @param[out] res   outbound response object
648  *
649  * @return operation status
650  *    @retval SYNC_OK success
651  *    @retval SYNC_FAILED malformed command packet
652  *    @retval SYNC_DENIED the request could not be completed
653  *
654  * @note this is a SALVSYNC protocol rpc handler
655  *
656  * @post the requested volume is marked as a child of another volume.
657  *       thus, future salvage requests for this volume will result in the
658  *       parent of the volume group being scheduled for salvage instead
659  *       of this clone.
660  *
661  * @internal
662  */
663 static afs_int32
664 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
665 {
666     afs_int32 code = SYNC_OK;
667     struct SalvageQueueNode * clone, * parent;
668
669     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
670         code = SYNC_FAILED;
671         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
672         goto done;
673     }
674
675     /* lookup clone's salvage scheduling node */
676     clone = LookupNodeByCommand(com->sop, NULL);
677     if (clone == NULL) {
678         code = SYNC_DENIED;
679         res->hdr->reason = SALVSYNC_REASON_ERROR;
680         goto done;
681     }
682
683     /* lookup parent's salvage scheduling node */
684     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
685     if (parent == NULL) {
686         if (AllocNode(&parent)) {
687             code = SYNC_DENIED;
688             res->hdr->reason = SYNC_REASON_NOMEM;
689             goto done;
690         }
691         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
692         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
693         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
694         AddNodeToHash(parent);
695     }
696
697     if (LinkNode(parent, clone)) {
698         code = SYNC_DENIED;
699         goto done;
700     }
701
702  done:
703     return code;
704 }
705
706 /**
707  * query the status of a volume salvage request.
708  *
709  * @param[in]  com   inbound command object
710  * @param[out] res   outbound response object
711  *
712  * @return operation status
713  *    @retval SYNC_OK success
714  *    @retval SYNC_FAILED malformed command packet
715  *
716  * @note this is a SALVSYNC protocol rpc handler
717  *
718  * @internal
719  */
720 static afs_int32
721 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
722 {
723     afs_int32 code = SYNC_OK;
724     struct SalvageQueueNode * node;
725
726     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
727         code = SYNC_FAILED;
728         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
729         goto done;
730     }
731
732     LookupNodeByCommand(com->sop, &node);
733
734     /* query whether a volume is done salvaging */
735     if (node == NULL) {
736         res->sop->state = SALVSYNC_STATE_UNKNOWN;
737         res->sop->prio = 0;
738     } else {
739         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
740         res->sop->state = node->state;
741         res->sop->prio = node->command.sop.prio;
742     }
743
744  done:
745     return code;
746 }
747
748 static void
749 SALVSYNC_Drop(osi_socket fd)
750 {
751     RemoveHandler(fd);
752     rk_closesocket(fd);
753     AcceptOn();
754 }
755
756 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
757
758 static void
759 AcceptOn(void)
760 {
761     if (AcceptHandler == -1) {
762         opr_Verify(AddHandler(salvsync_server_state.fd,
763                               SALVSYNC_newconnection));
764         AcceptHandler = FindHandler(salvsync_server_state.fd);
765     }
766 }
767
768 static void
769 AcceptOff(void)
770 {
771     if (AcceptHandler != -1) {
772         opr_Verify(RemoveHandler(salvsync_server_state.fd));
773         AcceptHandler = -1;
774     }
775 }
776
777 /* The multiple FD handling code. */
778
779 static void
780 InitHandler(void)
781 {
782     int i;
783     ObtainWriteLock(&SALVSYNC_handler_lock);
784     for (i = 0; i < MAXHANDLERS; i++) {
785         HandlerFD[i] = OSI_NULLSOCKET;
786         HandlerProc[i] = NULL;
787     }
788     ReleaseWriteLock(&SALVSYNC_handler_lock);
789 }
790
791 static void
792 CallHandler(fd_set * fdsetp)
793 {
794     int i;
795     ObtainReadLock(&SALVSYNC_handler_lock);
796     for (i = 0; i < MAXHANDLERS; i++) {
797         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
798             ReleaseReadLock(&SALVSYNC_handler_lock);
799             (*HandlerProc[i]) (HandlerFD[i]);
800             ObtainReadLock(&SALVSYNC_handler_lock);
801         }
802     }
803     ReleaseReadLock(&SALVSYNC_handler_lock);
804 }
805
806 static int
807 AddHandler(osi_socket afd, void (*aproc) (int))
808 {
809     int i;
810     ObtainWriteLock(&SALVSYNC_handler_lock);
811     for (i = 0; i < MAXHANDLERS; i++)
812         if (HandlerFD[i] == OSI_NULLSOCKET)
813             break;
814     if (i >= MAXHANDLERS) {
815         ReleaseWriteLock(&SALVSYNC_handler_lock);
816         return 0;
817     }
818     HandlerFD[i] = afd;
819     HandlerProc[i] = aproc;
820     ReleaseWriteLock(&SALVSYNC_handler_lock);
821     return 1;
822 }
823
824 static int
825 FindHandler(osi_socket afd)
826 {
827     int i;
828     ObtainReadLock(&SALVSYNC_handler_lock);
829     for (i = 0; i < MAXHANDLERS; i++)
830         if (HandlerFD[i] == afd) {
831             ReleaseReadLock(&SALVSYNC_handler_lock);
832             return i;
833         }
834     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
835     osi_Panic("Failed to find handler\n");
836     return -1;                  /* satisfy compiler */
837 }
838
839 static int
840 FindHandler_r(osi_socket afd)
841 {
842     int i;
843     for (i = 0; i < MAXHANDLERS; i++)
844         if (HandlerFD[i] == afd) {
845             return i;
846         }
847     osi_Panic("Failed to find handler\n");
848     return -1;                  /* satisfy compiler */
849 }
850
851 static int
852 RemoveHandler(osi_socket afd)
853 {
854     ObtainWriteLock(&SALVSYNC_handler_lock);
855     HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
856     ReleaseWriteLock(&SALVSYNC_handler_lock);
857     return 1;
858 }
859
860 static void
861 GetHandler(fd_set * fdsetp, int *maxfdp)
862 {
863     int i;
864     int maxfd = -1;
865     FD_ZERO(fdsetp);
866     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
867     for (i = 0; i < MAXHANDLERS; i++)
868         if (HandlerFD[i] != OSI_NULLSOCKET) {
869             FD_SET(HandlerFD[i], fdsetp);
870 #ifndef AFS_NT40_ENV
871             /* On Windows the nfds parameter to select() is ignored */
872             if (maxfd < HandlerFD[i] || maxfd == (int)-1)
873                 maxfd = HandlerFD[i];
874 #endif
875         }
876     *maxfdp = maxfd;
877     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
878 }
879
880 /**
881  * allocate a salvage queue node.
882  *
883  * @param[out] node_out  address in which to store new node pointer
884  *
885  * @return operation status
886  *    @retval 0 success
887  *    @retval 1 failed to allocate node
888  *
889  * @internal
890  */
891 static int
892 AllocNode(struct SalvageQueueNode ** node_out)
893 {
894     int code = 0;
895     struct SalvageQueueNode * node;
896
897     *node_out = node = calloc(1, sizeof(struct SalvageQueueNode));
898     if (node == NULL) {
899         code = 1;
900         goto done;
901     }
902
903     node->type = SALVSYNC_VOLGROUP_PARENT;
904     node->state = SALVSYNC_STATE_UNKNOWN;
905
906  done:
907     return code;
908 }
909
910 /**
911  * link a salvage queue node to its parent.
912  *
913  * @param[in] parent  pointer to queue node for parent of volume group
914  * @param[in] clone   pointer to queue node for a clone
915  *
916  * @return operation status
917  *    @retval 0 success
918  *    @retval 1 failure
919  *
920  * @internal
921  */
922 static int
923 LinkNode(struct SalvageQueueNode * parent,
924          struct SalvageQueueNode * clone)
925 {
926     int code = 0;
927     int idx;
928
929     /* check for attaching a clone to a clone */
930     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
931         code = 1;
932         goto done;
933     }
934
935     /* check for pre-existing registration and openings */
936     for (idx = 0; idx < VOLMAXTYPES; idx++) {
937         if (parent->volgroup.children[idx] == clone) {
938             goto linked;
939         }
940         if (parent->volgroup.children[idx] == NULL) {
941             break;
942         }
943     }
944     if (idx == VOLMAXTYPES) {
945         code = 1;
946         goto done;
947     }
948
949     /* link parent and child */
950     parent->volgroup.children[idx] = clone;
951     clone->type = SALVSYNC_VOLGROUP_CLONE;
952     clone->volgroup.parent = parent;
953
954
955  linked:
956     switch (clone->state) {
957     case SALVSYNC_STATE_QUEUED:
958         DeleteFromSalvageQueue(clone);
959
960     case SALVSYNC_STATE_SALVAGING:
961         switch (parent->state) {
962         case SALVSYNC_STATE_UNKNOWN:
963         case SALVSYNC_STATE_ERROR:
964         case SALVSYNC_STATE_DONE:
965             parent->command.sop.prio = clone->command.sop.prio;
966             AddToSalvageQueue(parent);
967             break;
968
969         case SALVSYNC_STATE_QUEUED:
970             if (clone->command.sop.prio) {
971                 parent->command.sop.prio += clone->command.sop.prio;
972                 UpdateCommandPrio(parent);
973             }
974             break;
975
976         default:
977             break;
978         }
979         break;
980
981     default:
982         break;
983     }
984
985  done:
986     return code;
987 }
988
989 static void
990 HandlePrio(struct SalvageQueueNode * clone,
991            struct SalvageQueueNode * node,
992            afs_uint32 new_prio)
993 {
994     afs_uint32 delta;
995
996     switch (node->state) {
997     case SALVSYNC_STATE_ERROR:
998     case SALVSYNC_STATE_DONE:
999     case SALVSYNC_STATE_UNKNOWN:
1000         node->command.sop.prio = 0;
1001         break;
1002     default:
1003         break;
1004     }
1005
1006     if (new_prio < clone->command.sop.prio) {
1007         /* strange. let's just set our delta to 1 */
1008         delta = 1;
1009     } else {
1010         delta = new_prio - clone->command.sop.prio;
1011     }
1012
1013     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1014         clone->command.sop.prio = new_prio;
1015     }
1016
1017     node->command.sop.prio += delta;
1018 }
1019
1020 static int
1021 AddToSalvageQueue(struct SalvageQueueNode * node)
1022 {
1023     afs_int32 id;
1024     struct SalvageQueueNode * last = NULL;
1025
1026     id = volutil_GetPartitionID(node->command.sop.partName);
1027     if (id < 0 || id > VOLMAXPARTS) {
1028         return 1;
1029     }
1030     if (!VGetPartitionById_r(id, 0)) {
1031         /* don't enqueue salvage requests for unmounted partitions */
1032         return 1;
1033     }
1034     if (queue_IsOnQueue(node)) {
1035         return 0;
1036     }
1037
1038     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1039         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1040     }
1041     queue_Append(&salvageQueue.part[id], node);
1042     salvageQueue.len[id]++;
1043     salvageQueue.total_len++;
1044     salvageQueue.last_insert = id;
1045     node->partition_id = id;
1046     node->state = SALVSYNC_STATE_QUEUED;
1047
1048     /* reorder, if necessary */
1049     if (last && last->command.sop.prio < node->command.sop.prio) {
1050         UpdateCommandPrio(node);
1051     }
1052
1053     CV_BROADCAST(&salvageQueue.cv);
1054     return 0;
1055 }
1056
1057 static void
1058 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1059 {
1060     if (queue_IsOnQueue(node)) {
1061         queue_Remove(node);
1062         salvageQueue.len[node->partition_id]--;
1063         salvageQueue.total_len--;
1064         node->state = SALVSYNC_STATE_UNKNOWN;
1065         CV_BROADCAST(&salvageQueue.cv);
1066     }
1067 }
1068
1069 static void
1070 AddToPendingQueue(struct SalvageQueueNode * node)
1071 {
1072     queue_Append(&pendingQueue, node);
1073     pendingQueue.len++;
1074     node->state = SALVSYNC_STATE_SALVAGING;
1075     CV_BROADCAST(&pendingQueue.queue_change_cv);
1076 }
1077
1078 static void
1079 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1080 {
1081     if (queue_IsOnQueue(node)) {
1082         queue_Remove(node);
1083         pendingQueue.len--;
1084         node->state = SALVSYNC_STATE_UNKNOWN;
1085         CV_BROADCAST(&pendingQueue.queue_change_cv);
1086     }
1087 }
1088
1089 #if 0
1090 static struct SalvageQueueNode *
1091 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1092 {
1093     struct SalvageQueueNode * np, * nnp;
1094
1095     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1096         if ((np->command.sop.volume == qry->volume) &&
1097             !strncmp(np->command.sop.partName, qry->partName,
1098                      sizeof(qry->partName)))
1099             break;
1100     }
1101
1102     if (queue_IsEnd(&pendingQueue, np))
1103         np = NULL;
1104     return np;
1105 }
1106 #endif
1107
1108 static struct SalvageQueueNode *
1109 LookupPendingCommandByPid(int pid)
1110 {
1111     struct SalvageQueueNode * np, * nnp;
1112
1113     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1114         if (np->pid == pid)
1115             break;
1116     }
1117
1118     if (queue_IsEnd(&pendingQueue, np))
1119         np = NULL;
1120     return np;
1121 }
1122
1123
1124 /* raise the priority of a previously scheduled salvage */
1125 static void
1126 UpdateCommandPrio(struct SalvageQueueNode * node)
1127 {
1128     struct SalvageQueueNode *np, *nnp;
1129     afs_int32 id;
1130     afs_uint32 prio;
1131
1132     opr_Assert(queue_IsOnQueue(node));
1133
1134     prio = node->command.sop.prio;
1135     id = node->partition_id;
1136     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1137         queue_Remove(node);
1138         queue_Prepend(&salvageQueue.part[id], node);
1139     } else {
1140         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1141             if (np->command.sop.prio > prio)
1142                 break;
1143         }
1144         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1145             queue_Remove(node);
1146             queue_Prepend(&salvageQueue.part[id], node);
1147         } else if (node != np) {
1148             queue_Remove(node);
1149             queue_InsertAfter(np, node);
1150         }
1151     }
1152 }
1153
1154 /* this will need to be rearchitected if we ever want more than one thread
1155  * to wait for new salvage nodes */
1156 struct SalvageQueueNode *
1157 SALVSYNC_getWork(void)
1158 {
1159     int i;
1160     struct DiskPartition64 * dp = NULL, * fdp;
1161     static afs_int32 next_part_sched = 0;
1162     struct SalvageQueueNode *node = NULL;
1163
1164     VOL_LOCK;
1165
1166     /*
1167      * wait for work to be scheduled
1168      * if there are no disk partitions, just sit in this wait loop forever
1169      */
1170     while (!salvageQueue.total_len || !DiskPartitionList) {
1171         VOL_CV_WAIT(&salvageQueue.cv);
1172     }
1173
1174     /*
1175      * short circuit for simple case where only one partition has
1176      * scheduled salvages
1177      */
1178     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1179         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1180         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1181         goto have_node;
1182     }
1183
1184
1185     /*
1186      * ok, more than one partition has scheduled salvages.
1187      * now search for partitions with scheduled salvages, but no pending salvages.
1188      */
1189     dp = VGetPartitionById_r(next_part_sched, 0);
1190     if (!dp) {
1191         dp = DiskPartitionList;
1192     }
1193     fdp = dp;
1194
1195     for (i=0 ;
1196          !i || dp != fdp ;
1197          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1198         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1199             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1200             goto have_node;
1201         }
1202     }
1203
1204
1205     /*
1206      * all partitions with scheduled salvages have at least one pending.
1207      * now do an exhaustive search for a scheduled salvage.
1208      */
1209     dp = fdp;
1210
1211     for (i=0 ;
1212          !i || dp != fdp ;
1213          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1214         if (salvageQueue.len[dp->index]) {
1215             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1216             goto have_node;
1217         }
1218     }
1219
1220     /* we should never reach this line */
1221     osi_Panic("Node not found\n");
1222
1223  have_node:
1224     opr_Assert(node != NULL);
1225     node->pid = 0;
1226     partition_salvaging[node->partition_id]++;
1227     DeleteFromSalvageQueue(node);
1228     AddToPendingQueue(node);
1229
1230     if (dp) {
1231         /* update next_part_sched field */
1232         if (dp->next) {
1233             next_part_sched = dp->next->index;
1234         } else if (DiskPartitionList) {
1235             next_part_sched = DiskPartitionList->index;
1236         } else {
1237             next_part_sched = -1;
1238         }
1239     }
1240
1241     VOL_UNLOCK;
1242     return node;
1243 }
1244
1245 /**
1246  * update internal scheduler state to reflect completion of a work unit.
1247  *
1248  * @param[in]  node    salvage queue node object pointer
1249  * @param[in]  result  worker process result code
1250  *
1251  * @post scheduler state is updated.
1252  *
1253  * @internal
1254  */
1255 static void
1256 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1257 {
1258     afs_int32 partid;
1259     int idx;
1260
1261     DeleteFromPendingQueue(node);
1262     partid = node->partition_id;
1263     if (partid >=0 && partid <= VOLMAXPARTS) {
1264         partition_salvaging[partid]--;
1265     }
1266     if (result == 0) {
1267         node->state = SALVSYNC_STATE_DONE;
1268     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1269         node->state = SALVSYNC_STATE_ERROR;
1270     }
1271
1272     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1273         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1274             if (node->volgroup.children[idx]) {
1275                 node->volgroup.children[idx]->state = node->state;
1276             }
1277         }
1278     }
1279 }
1280
1281 /**
1282  * check whether worker child failed.
1283  *
1284  * @param[in] status  status bitfield return by wait()
1285  *
1286  * @return boolean failure code
1287  *    @retval 0 child succeeded
1288  *    @retval 1 child failed
1289  *
1290  * @internal
1291  */
1292 static int
1293 ChildFailed(int status)
1294 {
1295     return (WCOREDUMP(status) ||
1296             WIFSIGNALED(status) ||
1297             ((WEXITSTATUS(status) != 0) &&
1298              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1299 }
1300
1301
1302 /**
1303  * notify salvsync scheduler of node completion, by child pid.
1304  *
1305  * @param[in]  pid     pid of worker child
1306  * @param[in]  status  worker status bitfield from wait()
1307  *
1308  * @post scheduler state is updated.
1309  *       if status code is a failure, fileserver notification was attempted
1310  *
1311  * @see SALVSYNC_doneWork_r
1312  */
1313 void
1314 SALVSYNC_doneWorkByPid(int pid, int status)
1315 {
1316     struct SalvageQueueNode * node;
1317     char partName[16];
1318     VolumeId volids[VOLMAXTYPES+1];
1319     unsigned int idx;
1320
1321     memset(volids, 0, sizeof(volids));
1322
1323     VOL_LOCK;
1324     node = LookupPendingCommandByPid(pid);
1325     if (node != NULL) {
1326         SALVSYNC_doneWork_r(node, status);
1327
1328         if (ChildFailed(status)) {
1329             /* populate volume id list for later processing outside the glock */
1330             volids[0] = node->command.sop.volume;
1331             strcpy(partName, node->command.sop.partName);
1332             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1333                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1334                     if (node->volgroup.children[idx]) {
1335                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1336                     }
1337                 }
1338             }
1339         }
1340     }
1341     VOL_UNLOCK;
1342
1343     /*
1344      * if necessary, notify fileserver of
1345      * failure to salvage volume group
1346      * [we cannot guarantee that the child made the
1347      *  appropriate notifications (e.g. SIGSEGV)]
1348      *  -- tkeiser 11/28/2007
1349      */
1350     if (ChildFailed(status)) {
1351         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1352             if (volids[idx]) {
1353                 FSYNC_VolOp(volids[idx],
1354                             partName,
1355                             FSYNC_VOL_FORCE_ERROR,
1356                             FSYNC_WHATEVER,
1357                             NULL);
1358             }
1359         }
1360     }
1361 }
1362
1363 #endif /* AFS_DEMAND_ATTACH_FS */