vol: Remove unneeded braces
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29 #include <afs/procmgmt.h>
30 #include <roken.h>
31
32 #include <stddef.h>
33
34 #ifdef AFS_NT40_ENV
35 #include <winsock2.h>
36 #endif
37
38 #include <rx/xdr.h>
39 #include <afs/afsint.h>
40 #include "nfs.h"
41 #include <afs/errors.h>
42 #include "salvsync.h"
43 #include "lwp.h"
44 #include "lock.h"
45 #include <afs/afssyscalls.h>
46 #include "ihandle.h"
47 #include "vnode.h"
48 #include "volume.h"
49 #include "partition.h"
50 #include "common.h"
51 #include <rx/rx_queue.h>
52
53 #ifdef USE_UNIX_SOCKETS
54 #include <afs/afsutil.h>
55 #include <sys/un.h>
56 #endif
57
58 #ifndef WCOREDUMP
59 #define WCOREDUMP(x)    ((x) & 0200)
60 #endif
61
62 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
63                                  * move = dump+restore can run on single server */
64
65
66 /*
67  * This lock controls access to the handler array.
68  */
69 struct Lock SALVSYNC_handler_lock;
70
71
72 #ifdef AFS_DEMAND_ATTACH_FS
73 /*
74  * SALVSYNC is a feature specific to the demand attach fileserver
75  */
76
77 /* Forward declarations */
78 static void * SALVSYNC_syncThread(void *);
79 static void SALVSYNC_newconnection(osi_socket fd);
80 static void SALVSYNC_com(osi_socket fd);
81 static void SALVSYNC_Drop(osi_socket fd);
82 static void AcceptOn(void);
83 static void AcceptOff(void);
84 static void InitHandler(void);
85 static void CallHandler(fd_set * fdsetp);
86 static int AddHandler(osi_socket afd, void (*aproc) (int));
87 static int FindHandler(osi_socket afd);
88 static int FindHandler_r(osi_socket afd);
89 static int RemoveHandler(osi_socket afd);
90 static void GetHandler(fd_set * fdsetp, int *maxfdp);
91
92 static int AllocNode(struct SalvageQueueNode ** node);
93
94 static int AddToSalvageQueue(struct SalvageQueueNode * node);
95 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
96 static void AddToPendingQueue(struct SalvageQueueNode * node);
97 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
98 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
99 static void UpdateCommandPrio(struct SalvageQueueNode * node);
100 static void HandlePrio(struct SalvageQueueNode * clone,
101                        struct SalvageQueueNode * parent,
102                        afs_uint32 new_prio);
103
104 static int LinkNode(struct SalvageQueueNode * parent,
105                     struct SalvageQueueNode * clone);
106
107 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
108                                             struct SalvageQueueNode ** parent);
109 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
110                                                      struct SalvageQueueNode ** parent);
111 static void AddNodeToHash(struct SalvageQueueNode * node);
112
113 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
114 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
115 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
116 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
117 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
118
119
120 extern int LogLevel;
121 extern int VInit;
122 extern pthread_mutex_t vol_salvsync_mutex;
123
124 /**
125  * salvsync server socket handle.
126  */
127 static SYNC_server_state_t salvsync_server_state =
128     { OSI_NULLSOCKET,                       /* file descriptor */
129       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
130       SALVSYNC_PROTO_VERSION,   /* protocol version */
131       5,                        /* bind() retry limit */
132       100,                      /* listen() queue depth */
133       "SALVSYNC",               /* protocol name string */
134     };
135
136
137 /**
138  * queue of all volumes waiting to be salvaged.
139  */
140 struct SalvageQueue {
141     volatile int total_len;
142     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
143     volatile int len[VOLMAXPARTS+1];
144     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
145     pthread_cond_t cv;
146 };
147 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
148
149 /**
150  * queue of all volumes currently being salvaged.
151  */
152 struct QueueHead {
153     volatile struct rx_queue q;  /**< queue of salvages in progress */
154     volatile int len;            /**< length of in-progress queue */
155     pthread_cond_t queue_change_cv;
156 };
157 static struct QueueHead pendingQueue;  /* volumes being salvaged */
158
159 /* XXX
160  * whether a partition has a salvage in progress
161  *
162  * the salvager code only permits one salvage per partition at a time
163  *
164  * the following hack tries to keep salvaged parallelism high by
165  * only permitting one salvage dispatch per partition at a time
166  *
167  * unfortunately, the parallel salvager currently
168  * has a rather braindead routine that won't permit
169  * multiple salvages on the same "device".  this
170  * function happens to break pretty badly on lvm, raid luns, etc.
171  *
172  * this hack isn't good enough to stop the device limiting code from
173  * crippling performance.  someday that code needs to be rewritten
174  */
175 static int partition_salvaging[VOLMAXPARTS+1];
176
177 static int HandlerFD[MAXHANDLERS];
178 static void (*HandlerProc[MAXHANDLERS]) (int);
179
180 #define VSHASH_SIZE 64
181 #define VSHASH_MASK (VSHASH_SIZE-1)
182 #define VSHASH(vid) ((vid)&VSHASH_MASK)
183
184 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
185
186 static struct SalvageQueueNode *
187 LookupNode(afs_uint32 vid, char * partName,
188            struct SalvageQueueNode ** parent)
189 {
190     struct rx_queue *qp, *nqp;
191     struct SalvageQueueNode *vsp;
192     int idx = VSHASH(vid);
193
194     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
195         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
196         if ((vsp->command.sop.volume == vid) &&
197             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
198             break;
199         }
200     }
201
202     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
203         vsp = NULL;
204     }
205
206     if (parent) {
207         if (vsp) {
208             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
209                 vsp->volgroup.parent : vsp;
210         } else {
211             *parent = NULL;
212         }
213     }
214
215     return vsp;
216 }
217
218 static struct SalvageQueueNode *
219 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
220                     struct SalvageQueueNode ** parent)
221 {
222     return LookupNode(qry->volume, qry->partName, parent);
223 }
224
225 static void
226 AddNodeToHash(struct SalvageQueueNode * node)
227 {
228     int idx = VSHASH(node->command.sop.volume);
229
230     if (queue_IsOnQueue(&node->hash_chain)) {
231         return;
232     }
233
234     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
235     SalvageHashTable[idx].len++;
236 }
237
238 #if 0
239 static void
240 DeleteNodeFromHash(struct SalvageQueueNode * node)
241 {
242     int idx = VSHASH(node->command.sop.volume);
243
244     if (queue_IsNotOnQueue(&node->hash_chain)) {
245         return;
246     }
247
248     queue_Remove(&node->hash_chain);
249     SalvageHashTable[idx].len--;
250 }
251 #endif
252
253 void
254 SALVSYNC_salvInit(void)
255 {
256     int i;
257     pthread_t tid;
258     pthread_attr_t tattr;
259
260     /* initialize the queues */
261     Lock_Init(&SALVSYNC_handler_lock);
262     CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
263     for (i = 0; i <= VOLMAXPARTS; i++) {
264         queue_Init(&salvageQueue.part[i]);
265         salvageQueue.len[i] = 0;
266     }
267     CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
268     queue_Init(&pendingQueue);
269     salvageQueue.total_len = pendingQueue.len = 0;
270     salvageQueue.last_insert = -1;
271     memset(partition_salvaging, 0, sizeof(partition_salvaging));
272
273     for (i = 0; i < VSHASH_SIZE; i++) {
274         CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
275         SalvageHashTable[i].len = 0;
276         queue_Init(&SalvageHashTable[i]);
277     }
278
279     /* start the salvsync thread */
280     osi_Assert(pthread_attr_init(&tattr) == 0);
281     osi_Assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
282     osi_Assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
283 }
284
285 static void
286 CleanFDs(void)
287 {
288     int i;
289     for (i = 0; i < MAXHANDLERS; ++i) {
290         if (HandlerFD[i] >= 0) {
291             SALVSYNC_Drop(HandlerFD[i]);
292         }
293     }
294
295     /* just in case we were in AcceptOff mode, and thus this fd wouldn't
296      * have a handler */
297     close(salvsync_server_state.fd);
298     salvsync_server_state.fd = OSI_NULLSOCKET;
299 }
300
301 static fd_set SALVSYNC_readfds;
302
303 static void *
304 SALVSYNC_syncThread(void * args)
305 {
306     int code;
307     SYNC_server_state_t * state = &salvsync_server_state;
308
309     /* when we fork, the child needs to close the salvsync server sockets,
310      * otherwise, it may get salvsync requests, instead of the parent
311      * salvageserver */
312     osi_Assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
313
314     SYNC_getAddr(&state->endpoint, &state->addr);
315     SYNC_cleanupSock(state);
316
317 #ifndef AFS_NT40_ENV
318     (void)signal(SIGPIPE, SIG_IGN);
319 #endif
320
321     state->fd = SYNC_getSock(&state->endpoint);
322     code = SYNC_bindSock(state);
323     osi_Assert(!code);
324
325     InitHandler();
326     AcceptOn();
327
328     for (;;) {
329         int maxfd;
330         struct timeval s_timeout;
331         GetHandler(&SALVSYNC_readfds, &maxfd);
332         s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
333         s_timeout.tv_usec = 0;
334         /* Note: check for >= 1 below is essential since IOMGR_select
335          * doesn't have exactly same semantics as select.
336          */
337         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
338             CallHandler(&SALVSYNC_readfds);
339     }
340
341     return NULL;
342 }
343
344 static void
345 SALVSYNC_newconnection(int afd)
346 {
347 #ifdef USE_UNIX_SOCKETS
348     struct sockaddr_un other;
349 #else  /* USE_UNIX_SOCKETS */
350     struct sockaddr_in other;
351 #endif
352     int fd;
353     socklen_t junk;
354
355     junk = sizeof(other);
356     fd = accept(afd, (struct sockaddr *)&other, &junk);
357     if (fd == OSI_NULLSOCKET) {
358         osi_Panic("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
359     } else if (!AddHandler(fd, SALVSYNC_com)) {
360         AcceptOff();
361         osi_Assert(AddHandler(fd, SALVSYNC_com));
362     }
363 }
364
365 /* this function processes commands from an salvsync file descriptor (fd) */
366 static afs_int32 SALV_cnt = 0;
367 static void
368 SALVSYNC_com(osi_socket fd)
369 {
370     SYNC_command com;
371     SYNC_response res;
372     SALVSYNC_response_hdr sres_hdr;
373     SALVSYNC_command scom;
374     SALVSYNC_response sres;
375     SYNC_PROTO_BUF_DECL(buf);
376
377     memset(&com, 0, sizeof(com));
378     memset(&res, 0, sizeof(res));
379     memset(&scom, 0, sizeof(scom));
380     memset(&sres, 0, sizeof(sres));
381     memset(&sres_hdr, 0, sizeof(sres_hdr));
382
383     com.payload.buf = (void *)buf;
384     com.payload.len = SYNC_PROTO_MAX_LEN;
385     res.payload.buf = (void *) &sres_hdr;
386     res.payload.len = sizeof(sres_hdr);
387     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
388     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
389
390     scom.hdr = &com.hdr;
391     scom.sop = (SALVSYNC_command_hdr *) buf;
392     scom.com = &com;
393     sres.hdr = &res.hdr;
394     sres.sop = &sres_hdr;
395     sres.res = &res;
396
397     SALV_cnt++;
398     if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
399         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
400         SALVSYNC_Drop(fd);
401         return;
402     }
403
404     if (com.recv_len < sizeof(com.hdr)) {
405         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
406         res.hdr.response = SYNC_COM_ERROR;
407         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
408         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
409         goto respond;
410     }
411
412     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
413         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
414         res.hdr.response = SYNC_COM_ERROR;
415         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
416         goto respond;
417     }
418
419     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
420         res.hdr.response = SYNC_OK;
421         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
422
423         /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
424          * never wait for a response. */
425         goto done;
426     }
427
428     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
429         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
430         res.hdr.response = SYNC_COM_ERROR;
431         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
432         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
433         goto respond;
434     }
435
436     res.hdr.com_seq = com.hdr.com_seq;
437
438     VOL_LOCK;
439     switch (com.hdr.command) {
440     case SALVSYNC_NOP:
441         break;
442     case SALVSYNC_SALVAGE:
443     case SALVSYNC_RAISEPRIO:
444         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
445         break;
446     case SALVSYNC_CANCEL:
447         /* cancel a salvage */
448         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
449         break;
450     case SALVSYNC_CANCELALL:
451         /* cancel all queued salvages */
452         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
453         break;
454     case SALVSYNC_QUERY:
455         /* query whether a volume is done salvaging */
456         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
457         break;
458     case SALVSYNC_OP_LINK:
459         /* link a clone to its parent in the scheduler */
460         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
461         break;
462     default:
463         res.hdr.response = SYNC_BAD_COMMAND;
464         break;
465     }
466
467     sres_hdr.sq_len = salvageQueue.total_len;
468     sres_hdr.pq_len = pendingQueue.len;
469     VOL_UNLOCK;
470
471  respond:
472     SYNC_putRes(&salvsync_server_state, fd, &res);
473
474  done:
475     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
476         SALVSYNC_Drop(fd);
477     }
478 }
479
480 /**
481  * request that a volume be salvaged.
482  *
483  * @param[in]  com  inbound command object
484  * @param[out] res  outbound response object
485  *
486  * @return operation status
487  *    @retval SYNC_OK success
488  *    @retval SYNC_DENIED failed to enqueue request
489  *    @retval SYNC_FAILED malformed command packet
490  *
491  * @note this is a SALVSYNC protocol rpc handler
492  *
493  * @internal
494  *
495  * @post the volume is enqueued in the to-be-salvaged queue.
496  *       if the volume was already in the salvage queue, its
497  *       priority (and thus its location in the queue) are
498  *       updated.
499  */
500 static afs_int32
501 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
502 {
503     afs_int32 code = SYNC_OK;
504     struct SalvageQueueNode * node, * clone;
505     int hash = 0;
506
507     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
508         code = SYNC_FAILED;
509         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
510         goto done;
511     }
512
513     clone = LookupNodeByCommand(com->sop, &node);
514
515     if (node == NULL) {
516         if (AllocNode(&node)) {
517             code = SYNC_DENIED;
518             res->hdr->reason = SYNC_REASON_NOMEM;
519             goto done;
520         }
521         clone = node;
522         hash = 1;
523     }
524
525     HandlePrio(clone, node, com->sop->prio);
526
527     switch (node->state) {
528     case SALVSYNC_STATE_QUEUED:
529         UpdateCommandPrio(node);
530         break;
531
532     case SALVSYNC_STATE_ERROR:
533     case SALVSYNC_STATE_DONE:
534     case SALVSYNC_STATE_UNKNOWN:
535         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
536         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
537
538         /*
539          * make sure volgroup parent partition path is kept coherent
540          *
541          * If we ever want to support non-COW clones on a machine holding
542          * the RW site, please note that this code does not work under the
543          * conditions where someone zaps a COW clone on partition X, and
544          * subsequently creates a full clone on partition Y -- we'd need
545          * an inverse to SALVSYNC_com_Link.
546          *  -- tkeiser 11/28/2007
547          */
548         strcpy(node->command.sop.partName, com->sop->partName);
549
550         if (AddToSalvageQueue(node)) {
551             code = SYNC_DENIED;
552         }
553         break;
554
555     default:
556         break;
557     }
558
559     if (hash) {
560         AddNodeToHash(node);
561     }
562
563     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
564     res->sop->state = node->state;
565     res->sop->prio = node->command.sop.prio;
566
567  done:
568     return code;
569 }
570
571 /**
572  * cancel a pending salvage request.
573  *
574  * @param[in]  com  inbound command object
575  * @param[out] res  outbound response object
576  *
577  * @return operation status
578  *    @retval SYNC_OK success
579  *    @retval SYNC_FAILED malformed command packet
580  *
581  * @note this is a SALVSYNC protocol rpc handler
582  *
583  * @internal
584  */
585 static afs_int32
586 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
587 {
588     afs_int32 code = SYNC_OK;
589     struct SalvageQueueNode * node;
590
591     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
592         code = SYNC_FAILED;
593         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
594         goto done;
595     }
596
597     node = LookupNodeByCommand(com->sop, NULL);
598
599     if (node == NULL) {
600         res->sop->state = SALVSYNC_STATE_UNKNOWN;
601         res->sop->prio = 0;
602     } else {
603         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
604         res->sop->prio = node->command.sop.prio;
605         res->sop->state = node->state;
606         if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
607             (node->state == SALVSYNC_STATE_QUEUED)) {
608             DeleteFromSalvageQueue(node);
609         }
610     }
611
612  done:
613     return code;
614 }
615
616 /**
617  * cancel all pending salvage requests.
618  *
619  * @param[in]  com  incoming command object
620  * @param[out] res  outbound response object
621  *
622  * @return operation status
623  *    @retval SYNC_OK success
624  *
625  * @note this is a SALVSYNC protocol rpc handler
626  *
627  * @internal
628  */
629 static afs_int32
630 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
631 {
632     struct SalvageQueueNode * np, *nnp;
633     struct DiskPartition64 * dp;
634
635     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
636         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
637             DeleteFromSalvageQueue(np);
638         }
639     }
640
641     return SYNC_OK;
642 }
643
644 /**
645  * link a queue node for a clone to its parent volume.
646  *
647  * @param[in]  com   inbound command object
648  * @param[out] res   outbound response object
649  *
650  * @return operation status
651  *    @retval SYNC_OK success
652  *    @retval SYNC_FAILED malformed command packet
653  *    @retval SYNC_DENIED the request could not be completed
654  *
655  * @note this is a SALVSYNC protocol rpc handler
656  *
657  * @post the requested volume is marked as a child of another volume.
658  *       thus, future salvage requests for this volume will result in the
659  *       parent of the volume group being scheduled for salvage instead
660  *       of this clone.
661  *
662  * @internal
663  */
664 static afs_int32
665 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
666 {
667     afs_int32 code = SYNC_OK;
668     struct SalvageQueueNode * clone, * parent;
669
670     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
671         code = SYNC_FAILED;
672         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
673         goto done;
674     }
675
676     /* lookup clone's salvage scheduling node */
677     clone = LookupNodeByCommand(com->sop, NULL);
678     if (clone == NULL) {
679         code = SYNC_DENIED;
680         res->hdr->reason = SALVSYNC_REASON_ERROR;
681         goto done;
682     }
683
684     /* lookup parent's salvage scheduling node */
685     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
686     if (parent == NULL) {
687         if (AllocNode(&parent)) {
688             code = SYNC_DENIED;
689             res->hdr->reason = SYNC_REASON_NOMEM;
690             goto done;
691         }
692         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
693         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
694         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
695         AddNodeToHash(parent);
696     }
697
698     if (LinkNode(parent, clone)) {
699         code = SYNC_DENIED;
700         goto done;
701     }
702
703  done:
704     return code;
705 }
706
707 /**
708  * query the status of a volume salvage request.
709  *
710  * @param[in]  com   inbound command object
711  * @param[out] res   outbound response object
712  *
713  * @return operation status
714  *    @retval SYNC_OK success
715  *    @retval SYNC_FAILED malformed command packet
716  *
717  * @note this is a SALVSYNC protocol rpc handler
718  *
719  * @internal
720  */
721 static afs_int32
722 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
723 {
724     afs_int32 code = SYNC_OK;
725     struct SalvageQueueNode * node;
726
727     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
728         code = SYNC_FAILED;
729         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
730         goto done;
731     }
732
733     LookupNodeByCommand(com->sop, &node);
734
735     /* query whether a volume is done salvaging */
736     if (node == NULL) {
737         res->sop->state = SALVSYNC_STATE_UNKNOWN;
738         res->sop->prio = 0;
739     } else {
740         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
741         res->sop->state = node->state;
742         res->sop->prio = node->command.sop.prio;
743     }
744
745  done:
746     return code;
747 }
748
749 static void
750 SALVSYNC_Drop(osi_socket fd)
751 {
752     RemoveHandler(fd);
753     rk_closesocket(fd);
754     AcceptOn();
755 }
756
757 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
758
759 static void
760 AcceptOn(void)
761 {
762     if (AcceptHandler == -1) {
763         osi_Assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
764         AcceptHandler = FindHandler(salvsync_server_state.fd);
765     }
766 }
767
768 static void
769 AcceptOff(void)
770 {
771     if (AcceptHandler != -1) {
772         osi_Assert(RemoveHandler(salvsync_server_state.fd));
773         AcceptHandler = -1;
774     }
775 }
776
777 /* The multiple FD handling code. */
778
779 static void
780 InitHandler(void)
781 {
782     int i;
783     ObtainWriteLock(&SALVSYNC_handler_lock);
784     for (i = 0; i < MAXHANDLERS; i++) {
785         HandlerFD[i] = OSI_NULLSOCKET;
786         HandlerProc[i] = NULL;
787     }
788     ReleaseWriteLock(&SALVSYNC_handler_lock);
789 }
790
791 static void
792 CallHandler(fd_set * fdsetp)
793 {
794     int i;
795     ObtainReadLock(&SALVSYNC_handler_lock);
796     for (i = 0; i < MAXHANDLERS; i++) {
797         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
798             ReleaseReadLock(&SALVSYNC_handler_lock);
799             (*HandlerProc[i]) (HandlerFD[i]);
800             ObtainReadLock(&SALVSYNC_handler_lock);
801         }
802     }
803     ReleaseReadLock(&SALVSYNC_handler_lock);
804 }
805
806 static int
807 AddHandler(osi_socket afd, void (*aproc) (int))
808 {
809     int i;
810     ObtainWriteLock(&SALVSYNC_handler_lock);
811     for (i = 0; i < MAXHANDLERS; i++)
812         if (HandlerFD[i] == OSI_NULLSOCKET)
813             break;
814     if (i >= MAXHANDLERS) {
815         ReleaseWriteLock(&SALVSYNC_handler_lock);
816         return 0;
817     }
818     HandlerFD[i] = afd;
819     HandlerProc[i] = aproc;
820     ReleaseWriteLock(&SALVSYNC_handler_lock);
821     return 1;
822 }
823
824 static int
825 FindHandler(osi_socket afd)
826 {
827     int i;
828     ObtainReadLock(&SALVSYNC_handler_lock);
829     for (i = 0; i < MAXHANDLERS; i++)
830         if (HandlerFD[i] == afd) {
831             ReleaseReadLock(&SALVSYNC_handler_lock);
832             return i;
833         }
834     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
835     osi_Panic("Failed to find handler\n");
836     return -1;                  /* satisfy compiler */
837 }
838
839 static int
840 FindHandler_r(osi_socket afd)
841 {
842     int i;
843     for (i = 0; i < MAXHANDLERS; i++)
844         if (HandlerFD[i] == afd) {
845             return i;
846         }
847     osi_Panic("Failed to find handler\n");
848     return -1;                  /* satisfy compiler */
849 }
850
851 static int
852 RemoveHandler(osi_socket afd)
853 {
854     ObtainWriteLock(&SALVSYNC_handler_lock);
855     HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
856     ReleaseWriteLock(&SALVSYNC_handler_lock);
857     return 1;
858 }
859
860 static void
861 GetHandler(fd_set * fdsetp, int *maxfdp)
862 {
863     int i;
864     int maxfd = -1;
865     FD_ZERO(fdsetp);
866     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
867     for (i = 0; i < MAXHANDLERS; i++)
868         if (HandlerFD[i] != OSI_NULLSOCKET) {
869             FD_SET(HandlerFD[i], fdsetp);
870 #ifndef AFS_NT40_ENV
871             /* On Windows the nfds parameter to select() is ignored */
872             if (maxfd < HandlerFD[i] || maxfd == (int)-1)
873                 maxfd = HandlerFD[i];
874 #endif
875         }
876     *maxfdp = maxfd;
877     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
878 }
879
880 /**
881  * allocate a salvage queue node.
882  *
883  * @param[out] node_out  address in which to store new node pointer
884  *
885  * @return operation status
886  *    @retval 0 success
887  *    @retval 1 failed to allocate node
888  *
889  * @internal
890  */
891 static int
892 AllocNode(struct SalvageQueueNode ** node_out)
893 {
894     int code = 0;
895     struct SalvageQueueNode * node;
896
897     *node_out = node = (struct SalvageQueueNode *)
898         malloc(sizeof(struct SalvageQueueNode));
899     if (node == NULL) {
900         code = 1;
901         goto done;
902     }
903
904     memset(node, 0, sizeof(struct SalvageQueueNode));
905     node->type = SALVSYNC_VOLGROUP_PARENT;
906     node->state = SALVSYNC_STATE_UNKNOWN;
907
908  done:
909     return code;
910 }
911
912 /**
913  * link a salvage queue node to its parent.
914  *
915  * @param[in] parent  pointer to queue node for parent of volume group
916  * @param[in] clone   pointer to queue node for a clone
917  *
918  * @return operation status
919  *    @retval 0 success
920  *    @retval 1 failure
921  *
922  * @internal
923  */
924 static int
925 LinkNode(struct SalvageQueueNode * parent,
926          struct SalvageQueueNode * clone)
927 {
928     int code = 0;
929     int idx;
930
931     /* check for attaching a clone to a clone */
932     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
933         code = 1;
934         goto done;
935     }
936
937     /* check for pre-existing registration and openings */
938     for (idx = 0; idx < VOLMAXTYPES; idx++) {
939         if (parent->volgroup.children[idx] == clone) {
940             goto linked;
941         }
942         if (parent->volgroup.children[idx] == NULL) {
943             break;
944         }
945     }
946     if (idx == VOLMAXTYPES) {
947         code = 1;
948         goto done;
949     }
950
951     /* link parent and child */
952     parent->volgroup.children[idx] = clone;
953     clone->type = SALVSYNC_VOLGROUP_CLONE;
954     clone->volgroup.parent = parent;
955
956
957  linked:
958     switch (clone->state) {
959     case SALVSYNC_STATE_QUEUED:
960         DeleteFromSalvageQueue(clone);
961
962     case SALVSYNC_STATE_SALVAGING:
963         switch (parent->state) {
964         case SALVSYNC_STATE_UNKNOWN:
965         case SALVSYNC_STATE_ERROR:
966         case SALVSYNC_STATE_DONE:
967             parent->command.sop.prio = clone->command.sop.prio;
968             AddToSalvageQueue(parent);
969             break;
970
971         case SALVSYNC_STATE_QUEUED:
972             if (clone->command.sop.prio) {
973                 parent->command.sop.prio += clone->command.sop.prio;
974                 UpdateCommandPrio(parent);
975             }
976             break;
977
978         default:
979             break;
980         }
981         break;
982
983     default:
984         break;
985     }
986
987  done:
988     return code;
989 }
990
991 static void
992 HandlePrio(struct SalvageQueueNode * clone,
993            struct SalvageQueueNode * node,
994            afs_uint32 new_prio)
995 {
996     afs_uint32 delta;
997
998     switch (node->state) {
999     case SALVSYNC_STATE_ERROR:
1000     case SALVSYNC_STATE_DONE:
1001     case SALVSYNC_STATE_UNKNOWN:
1002         node->command.sop.prio = 0;
1003         break;
1004     default:
1005         break;
1006     }
1007
1008     if (new_prio < clone->command.sop.prio) {
1009         /* strange. let's just set our delta to 1 */
1010         delta = 1;
1011     } else {
1012         delta = new_prio - clone->command.sop.prio;
1013     }
1014
1015     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1016         clone->command.sop.prio = new_prio;
1017     }
1018
1019     node->command.sop.prio += delta;
1020 }
1021
1022 static int
1023 AddToSalvageQueue(struct SalvageQueueNode * node)
1024 {
1025     afs_int32 id;
1026     struct SalvageQueueNode * last = NULL;
1027
1028     id = volutil_GetPartitionID(node->command.sop.partName);
1029     if (id < 0 || id > VOLMAXPARTS) {
1030         return 1;
1031     }
1032     if (!VGetPartitionById_r(id, 0)) {
1033         /* don't enqueue salvage requests for unmounted partitions */
1034         return 1;
1035     }
1036     if (queue_IsOnQueue(node)) {
1037         return 0;
1038     }
1039
1040     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1041         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1042     }
1043     queue_Append(&salvageQueue.part[id], node);
1044     salvageQueue.len[id]++;
1045     salvageQueue.total_len++;
1046     salvageQueue.last_insert = id;
1047     node->partition_id = id;
1048     node->state = SALVSYNC_STATE_QUEUED;
1049
1050     /* reorder, if necessary */
1051     if (last && last->command.sop.prio < node->command.sop.prio) {
1052         UpdateCommandPrio(node);
1053     }
1054
1055     CV_BROADCAST(&salvageQueue.cv);
1056     return 0;
1057 }
1058
1059 static void
1060 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1061 {
1062     if (queue_IsOnQueue(node)) {
1063         queue_Remove(node);
1064         salvageQueue.len[node->partition_id]--;
1065         salvageQueue.total_len--;
1066         node->state = SALVSYNC_STATE_UNKNOWN;
1067         CV_BROADCAST(&salvageQueue.cv);
1068     }
1069 }
1070
1071 static void
1072 AddToPendingQueue(struct SalvageQueueNode * node)
1073 {
1074     queue_Append(&pendingQueue, node);
1075     pendingQueue.len++;
1076     node->state = SALVSYNC_STATE_SALVAGING;
1077     CV_BROADCAST(&pendingQueue.queue_change_cv);
1078 }
1079
1080 static void
1081 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1082 {
1083     if (queue_IsOnQueue(node)) {
1084         queue_Remove(node);
1085         pendingQueue.len--;
1086         node->state = SALVSYNC_STATE_UNKNOWN;
1087         CV_BROADCAST(&pendingQueue.queue_change_cv);
1088     }
1089 }
1090
1091 #if 0
1092 static struct SalvageQueueNode *
1093 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1094 {
1095     struct SalvageQueueNode * np, * nnp;
1096
1097     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1098         if ((np->command.sop.volume == qry->volume) &&
1099             !strncmp(np->command.sop.partName, qry->partName,
1100                      sizeof(qry->partName)))
1101             break;
1102     }
1103
1104     if (queue_IsEnd(&pendingQueue, np))
1105         np = NULL;
1106     return np;
1107 }
1108 #endif
1109
1110 static struct SalvageQueueNode *
1111 LookupPendingCommandByPid(int pid)
1112 {
1113     struct SalvageQueueNode * np, * nnp;
1114
1115     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1116         if (np->pid == pid)
1117             break;
1118     }
1119
1120     if (queue_IsEnd(&pendingQueue, np))
1121         np = NULL;
1122     return np;
1123 }
1124
1125
1126 /* raise the priority of a previously scheduled salvage */
1127 static void
1128 UpdateCommandPrio(struct SalvageQueueNode * node)
1129 {
1130     struct SalvageQueueNode *np, *nnp;
1131     afs_int32 id;
1132     afs_uint32 prio;
1133
1134     osi_Assert(queue_IsOnQueue(node));
1135
1136     prio = node->command.sop.prio;
1137     id = node->partition_id;
1138     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1139         queue_Remove(node);
1140         queue_Prepend(&salvageQueue.part[id], node);
1141     } else {
1142         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1143             if (np->command.sop.prio > prio)
1144                 break;
1145         }
1146         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1147             queue_Remove(node);
1148             queue_Prepend(&salvageQueue.part[id], node);
1149         } else if (node != np) {
1150             queue_Remove(node);
1151             queue_InsertAfter(np, node);
1152         }
1153     }
1154 }
1155
1156 /* this will need to be rearchitected if we ever want more than one thread
1157  * to wait for new salvage nodes */
1158 struct SalvageQueueNode *
1159 SALVSYNC_getWork(void)
1160 {
1161     int i;
1162     struct DiskPartition64 * dp = NULL, * fdp;
1163     static afs_int32 next_part_sched = 0;
1164     struct SalvageQueueNode *node = NULL;
1165
1166     VOL_LOCK;
1167
1168     /*
1169      * wait for work to be scheduled
1170      * if there are no disk partitions, just sit in this wait loop forever
1171      */
1172     while (!salvageQueue.total_len || !DiskPartitionList) {
1173         VOL_CV_WAIT(&salvageQueue.cv);
1174     }
1175
1176     /*
1177      * short circuit for simple case where only one partition has
1178      * scheduled salvages
1179      */
1180     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1181         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1182         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1183         goto have_node;
1184     }
1185
1186
1187     /*
1188      * ok, more than one partition has scheduled salvages.
1189      * now search for partitions with scheduled salvages, but no pending salvages.
1190      */
1191     dp = VGetPartitionById_r(next_part_sched, 0);
1192     if (!dp) {
1193         dp = DiskPartitionList;
1194     }
1195     fdp = dp;
1196
1197     for (i=0 ;
1198          !i || dp != fdp ;
1199          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1200         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1201             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1202             goto have_node;
1203         }
1204     }
1205
1206
1207     /*
1208      * all partitions with scheduled salvages have at least one pending.
1209      * now do an exhaustive search for a scheduled salvage.
1210      */
1211     dp = fdp;
1212
1213     for (i=0 ;
1214          !i || dp != fdp ;
1215          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1216         if (salvageQueue.len[dp->index]) {
1217             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1218             goto have_node;
1219         }
1220     }
1221
1222     /* we should never reach this line */
1223     osi_Panic("Node not found\n");
1224
1225  have_node:
1226     osi_Assert(node != NULL);
1227     node->pid = 0;
1228     partition_salvaging[node->partition_id]++;
1229     DeleteFromSalvageQueue(node);
1230     AddToPendingQueue(node);
1231
1232     if (dp) {
1233         /* update next_part_sched field */
1234         if (dp->next) {
1235             next_part_sched = dp->next->index;
1236         } else if (DiskPartitionList) {
1237             next_part_sched = DiskPartitionList->index;
1238         } else {
1239             next_part_sched = -1;
1240         }
1241     }
1242
1243     VOL_UNLOCK;
1244     return node;
1245 }
1246
1247 /**
1248  * update internal scheduler state to reflect completion of a work unit.
1249  *
1250  * @param[in]  node    salvage queue node object pointer
1251  * @param[in]  result  worker process result code
1252  *
1253  * @post scheduler state is updated.
1254  *
1255  * @internal
1256  */
1257 static void
1258 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1259 {
1260     afs_int32 partid;
1261     int idx;
1262
1263     DeleteFromPendingQueue(node);
1264     partid = node->partition_id;
1265     if (partid >=0 && partid <= VOLMAXPARTS) {
1266         partition_salvaging[partid]--;
1267     }
1268     if (result == 0) {
1269         node->state = SALVSYNC_STATE_DONE;
1270     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1271         node->state = SALVSYNC_STATE_ERROR;
1272     }
1273
1274     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1275         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1276             if (node->volgroup.children[idx]) {
1277                 node->volgroup.children[idx]->state = node->state;
1278             }
1279         }
1280     }
1281 }
1282
1283 /**
1284  * check whether worker child failed.
1285  *
1286  * @param[in] status  status bitfield return by wait()
1287  *
1288  * @return boolean failure code
1289  *    @retval 0 child succeeded
1290  *    @retval 1 child failed
1291  *
1292  * @internal
1293  */
1294 static int
1295 ChildFailed(int status)
1296 {
1297     return (WCOREDUMP(status) ||
1298             WIFSIGNALED(status) ||
1299             ((WEXITSTATUS(status) != 0) &&
1300              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1301 }
1302
1303
1304 /**
1305  * notify salvsync scheduler of node completion, by child pid.
1306  *
1307  * @param[in]  pid     pid of worker child
1308  * @param[in]  status  worker status bitfield from wait()
1309  *
1310  * @post scheduler state is updated.
1311  *       if status code is a failure, fileserver notification was attempted
1312  *
1313  * @see SALVSYNC_doneWork_r
1314  */
1315 void
1316 SALVSYNC_doneWorkByPid(int pid, int status)
1317 {
1318     struct SalvageQueueNode * node;
1319     char partName[16];
1320     afs_uint32 volids[VOLMAXTYPES+1];
1321     unsigned int idx;
1322
1323     memset(volids, 0, sizeof(volids));
1324
1325     VOL_LOCK;
1326     node = LookupPendingCommandByPid(pid);
1327     if (node != NULL) {
1328         SALVSYNC_doneWork_r(node, status);
1329
1330         if (ChildFailed(status)) {
1331             /* populate volume id list for later processing outside the glock */
1332             volids[0] = node->command.sop.volume;
1333             strcpy(partName, node->command.sop.partName);
1334             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1335                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1336                     if (node->volgroup.children[idx]) {
1337                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1338                     }
1339                 }
1340             }
1341         }
1342     }
1343     VOL_UNLOCK;
1344
1345     /*
1346      * if necessary, notify fileserver of
1347      * failure to salvage volume group
1348      * [we cannot guarantee that the child made the
1349      *  appropriate notifications (e.g. SIGSEGV)]
1350      *  -- tkeiser 11/28/2007
1351      */
1352     if (ChildFailed(status)) {
1353         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1354             if (volids[idx]) {
1355                 FSYNC_VolOp(volids[idx],
1356                             partName,
1357                             FSYNC_VOL_FORCE_ERROR,
1358                             FSYNC_WHATEVER,
1359                             NULL);
1360             }
1361         }
1362     }
1363 }
1364
1365 #endif /* AFS_DEMAND_ATTACH_FS */