cfcabf527ba177769ff1493e963a1e660a9ce93c
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29 RCSID
30     ("$Header$");
31
32 #include <sys/types.h>
33 #include <stdio.h>
34 #ifdef AFS_NT40_ENV
35 #include <winsock2.h>
36 #include <time.h>
37 #else
38 #include <sys/param.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <netdb.h>
42 #include <sys/time.h>
43 #endif
44 #include <errno.h>
45 #include <assert.h>
46 #include <signal.h>
47 #include <string.h>
48
49
50 #include <rx/xdr.h>
51 #include <afs/afsint.h>
52 #include "nfs.h"
53 #include <afs/errors.h>
54 #include "salvsync.h"
55 #include "lwp.h"
56 #include "lock.h"
57 #include <afs/afssyscalls.h>
58 #include "ihandle.h"
59 #include "vnode.h"
60 #include "volume.h"
61 #include "partition.h"
62 #include <rx/rx_queue.h>
63 #include <afs/procmgmt.h>
64
65 #if !defined(offsetof)
66 #include <stddef.h>
67 #endif
68
69 #ifdef USE_UNIX_SOCKETS
70 #include <afs/afsutil.h>
71 #include <sys/un.h>
72 #endif
73
74
75 /*@printflike@*/ extern void Log(const char *format, ...);
76
77 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
78                                  * move = dump+restore can run on single server */
79
80 /* Forward declarations */
81 static void * SALVSYNC_syncThread(void *);
82 static void SALVSYNC_newconnection(int fd);
83 static void SALVSYNC_com(int fd);
84 static void SALVSYNC_Drop(int fd);
85 static void AcceptOn(void);
86 static void AcceptOff(void);
87 static void InitHandler(void);
88 static void CallHandler(fd_set * fdsetp);
89 static int AddHandler(int afd, void (*aproc) (int));
90 static int FindHandler(register int afd);
91 static int FindHandler_r(register int afd);
92 static int RemoveHandler(register int afd);
93 static void GetHandler(fd_set * fdsetp, int *maxfdp);
94
95
96 /*
97  * This lock controls access to the handler array.
98  */
99 struct Lock SALVSYNC_handler_lock;
100
101
102 #ifdef AFS_DEMAND_ATTACH_FS
103 /*
104  * SALVSYNC is a feature specific to the demand attach fileserver
105  */
106
107 static int AllocNode(struct SalvageQueueNode ** node);
108
109 static int AddToSalvageQueue(struct SalvageQueueNode * node);
110 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
111 static void AddToPendingQueue(struct SalvageQueueNode * node);
112 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
113 static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
114 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
115 static void UpdateCommandPrio(struct SalvageQueueNode * node);
116 static void HandlePrio(struct SalvageQueueNode * clone, 
117                        struct SalvageQueueNode * parent,
118                        afs_uint32 new_prio);
119
120 static int LinkNode(struct SalvageQueueNode * parent,
121                     struct SalvageQueueNode * clone);
122
123 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName, 
124                                             struct SalvageQueueNode ** parent);
125 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
126                                                      struct SalvageQueueNode ** parent);
127 static void AddNodeToHash(struct SalvageQueueNode * node);
128 static void DeleteNodeFromHash(struct SalvageQueueNode * node);
129
130 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
131 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
132 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
133 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
134 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
135
136
137 extern int LogLevel;
138 extern int VInit;
139 extern pthread_mutex_t vol_salvsync_mutex;
140
141 /**
142  * salvsync server socket handle.
143  */
144 static SYNC_server_state_t salvsync_server_state = 
145     { -1,                       /* file descriptor */
146       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
147       SALVSYNC_PROTO_VERSION,   /* protocol version */
148       5,                        /* bind() retry limit */
149       100,                      /* listen() queue depth */
150       "SALVSYNC",               /* protocol name string */
151     };
152
153
154 /**
155  * queue of all volumes waiting to be salvaged.
156  */
157 struct SalvageQueue {
158     volatile int total_len;
159     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
160     volatile int len[VOLMAXPARTS+1];
161     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
162     pthread_cond_t cv;
163 };
164 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
165
166 /**
167  * queue of all volumes currently being salvaged.
168  */
169 struct QueueHead {
170     volatile struct rx_queue q;  /**< queue of salvages in progress */
171     volatile int len;            /**< length of in-progress queue */
172     pthread_cond_t queue_change_cv;
173 };
174 static struct QueueHead pendingQueue;  /* volumes being salvaged */
175
176 /* XXX
177  * whether a partition has a salvage in progress
178  *
179  * the salvager code only permits one salvage per partition at a time
180  *
181  * the following hack tries to keep salvaged parallelism high by
182  * only permitting one salvage dispatch per partition at a time
183  *
184  * unfortunately, the parallel salvager currently
185  * has a rather braindead routine that won't permit
186  * multiple salvages on the same "device".  this
187  * function happens to break pretty badly on lvm, raid luns, etc.
188  *
189  * this hack isn't good enough to stop the device limiting code from
190  * crippling performance.  someday that code needs to be rewritten
191  */
192 static int partition_salvaging[VOLMAXPARTS+1];
193
194 #define VSHASH_SIZE 64
195 #define VSHASH_MASK (VSHASH_SIZE-1)
196 #define VSHASH(vid) ((vid)&VSHASH_MASK)
197
198 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
199
200 static struct SalvageQueueNode *
201 LookupNode(afs_uint32 vid, char * partName,
202            struct SalvageQueueNode ** parent)
203 {
204     struct rx_queue *qp, *nqp;
205     struct SalvageQueueNode *vsp;
206     int idx = VSHASH(vid);
207
208     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
209         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
210         if ((vsp->command.sop.volume == vid) &&
211             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
212             break;
213         }
214     }
215
216     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
217         vsp = NULL;
218     }
219
220     if (parent) {
221         if (vsp) {
222             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
223                 vsp->volgroup.parent : vsp;
224         } else {
225             *parent = NULL;
226         }
227     }
228
229     return vsp;
230 }
231
232 static struct SalvageQueueNode *
233 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
234                     struct SalvageQueueNode ** parent)
235 {
236     return LookupNode(qry->volume, qry->partName, parent);
237 }
238
239 static void
240 AddNodeToHash(struct SalvageQueueNode * node)
241 {
242     int idx = VSHASH(node->command.sop.volume);
243
244     if (queue_IsOnQueue(&node->hash_chain)) {
245         return;
246     }
247
248     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
249     SalvageHashTable[idx].len++;
250 }
251
252 static void
253 DeleteNodeFromHash(struct SalvageQueueNode * node)
254 {
255     int idx = VSHASH(node->command.sop.volume);
256
257     if (queue_IsNotOnQueue(&node->hash_chain)) {
258         return;
259     }
260
261     queue_Remove(&node->hash_chain);
262     SalvageHashTable[idx].len--;
263 }
264
265 void
266 SALVSYNC_salvInit(void)
267 {
268     int i;
269     pthread_t tid;
270     pthread_attr_t tattr;
271
272     /* initialize the queues */
273     assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
274     for (i = 0; i <= VOLMAXPARTS; i++) {
275         queue_Init(&salvageQueue.part[i]);
276         salvageQueue.len[i] = 0;
277     }
278     assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
279     queue_Init(&pendingQueue);
280     salvageQueue.total_len = pendingQueue.len = 0;
281     salvageQueue.last_insert = -1;
282     memset(partition_salvaging, 0, sizeof(partition_salvaging));
283
284     for (i = 0; i < VSHASH_SIZE; i++) {
285         assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
286         SalvageHashTable[i].len = 0;
287         queue_Init(&SalvageHashTable[i]);
288     }
289
290     /* start the salvsync thread */
291     assert(pthread_attr_init(&tattr) == 0);
292     assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
293     assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
294 }
295
296
297 static fd_set SALVSYNC_readfds;
298
299 static void *
300 SALVSYNC_syncThread(void * args)
301 {
302     int on = 1;
303     int code;
304     int numTries;
305     int tid;
306     SYNC_server_state_t * state = &salvsync_server_state;
307
308     SYNC_getAddr(&state->endpoint, &state->addr);
309     SYNC_cleanupSock(state);
310
311 #ifndef AFS_NT40_ENV
312     (void)signal(SIGPIPE, SIG_IGN);
313 #endif
314
315     state->fd = SYNC_getSock(&state->endpoint);
316     code = SYNC_bindSock(state);
317     assert(!code);
318
319     InitHandler();
320     AcceptOn();
321
322     for (;;) {
323         int maxfd;
324         GetHandler(&SALVSYNC_readfds, &maxfd);
325         /* Note: check for >= 1 below is essential since IOMGR_select
326          * doesn't have exactly same semantics as select.
327          */
328         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
329             CallHandler(&SALVSYNC_readfds);
330     }
331
332     return NULL;
333 }
334
335 static void
336 SALVSYNC_newconnection(int afd)
337 {
338 #ifdef USE_UNIX_SOCKETS
339     struct sockaddr_un other;
340 #else  /* USE_UNIX_SOCKETS */
341     struct sockaddr_in other;
342 #endif
343     int junk, fd;
344     junk = sizeof(other);
345     fd = accept(afd, (struct sockaddr *)&other, &junk);
346     if (fd == -1) {
347         Log("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
348         assert(1 == 2);
349     } else if (!AddHandler(fd, SALVSYNC_com)) {
350         AcceptOff();
351         assert(AddHandler(fd, SALVSYNC_com));
352     }
353 }
354
355 /* this function processes commands from an salvsync file descriptor (fd) */
356 static afs_int32 SALV_cnt = 0;
357 static void
358 SALVSYNC_com(int fd)
359 {
360     SYNC_command com;
361     SYNC_response res;
362     SALVSYNC_response_hdr sres_hdr;
363     SALVSYNC_command scom;
364     SALVSYNC_response sres;
365     SYNC_PROTO_BUF_DECL(buf);
366     
367     com.payload.buf = (void *)buf;
368     com.payload.len = SYNC_PROTO_MAX_LEN;
369     res.payload.buf = (void *) &sres_hdr;
370     res.payload.len = sizeof(sres_hdr);
371     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
372     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
373
374     scom.hdr = &com.hdr;
375     scom.sop = (SALVSYNC_command_hdr *) buf;
376     scom.com = &com;
377     sres.hdr = &res.hdr;
378     sres.sop = &sres_hdr;
379     sres.res = &res;
380
381     SALV_cnt++;
382     if (SYNC_getCom(fd, &com)) {
383         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
384         SALVSYNC_Drop(fd);
385         return;
386     }
387
388     if (com.recv_len < sizeof(com.hdr)) {
389         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
390         res.hdr.response = SYNC_COM_ERROR;
391         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
392         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
393         goto respond;
394     }
395
396     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
397         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
398         res.hdr.response = SYNC_COM_ERROR;
399         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
400         goto respond;
401     }
402
403     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
404         res.hdr.response = SYNC_OK;
405         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
406         goto respond;
407     }
408
409     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
410         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
411         res.hdr.response = SYNC_COM_ERROR;
412         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
413         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
414         goto respond;
415     }
416
417     VOL_LOCK;
418     switch (com.hdr.command) {
419     case SALVSYNC_NOP:
420         break;
421     case SALVSYNC_SALVAGE:
422     case SALVSYNC_RAISEPRIO:
423         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
424         break;
425     case SALVSYNC_CANCEL:
426         /* cancel a salvage */
427         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
428         break;
429     case SALVSYNC_CANCELALL:
430         /* cancel all queued salvages */
431         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
432         break;
433     case SALVSYNC_QUERY:
434         /* query whether a volume is done salvaging */
435         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
436         break;
437     case SALVSYNC_OP_LINK:
438         /* link a clone to its parent in the scheduler */
439         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
440         break;
441     default:
442         res.hdr.response = SYNC_BAD_COMMAND;
443         break;
444     }
445
446     sres_hdr.sq_len = salvageQueue.total_len;
447     sres_hdr.pq_len = pendingQueue.len;
448     VOL_UNLOCK;
449
450  respond:
451     SYNC_putRes(fd, &res);
452     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
453         SALVSYNC_Drop(fd);
454     }
455 }
456
457 /**
458  * request that a volume be salvaged.
459  *
460  * @param[in]  com  inbound command object
461  * @param[out] res  outbound response object
462  *
463  * @return operation status
464  *    @retval SYNC_OK success
465  *    @retval SYNC_DENIED failed to enqueue request
466  *    @retval SYNC_FAILED malformed command packet
467  *
468  * @note this is a SALVSYNC protocol rpc handler
469  *
470  * @internal
471  *
472  * @post the volume is enqueued in the to-be-salvaged queue.  
473  *       if the volume was already in the salvage queue, its 
474  *       priority (and thus its location in the queue) are 
475  *       updated.
476  */
477 static afs_int32
478 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
479 {
480     afs_int32 code = SYNC_OK;
481     struct SalvageQueueNode * node, * clone;
482     int hash = 0;
483
484     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
485         code = SYNC_FAILED;
486         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
487         goto done;
488     }
489
490     clone = LookupNodeByCommand(com->sop, &node);
491
492     if (node == NULL) {
493         if (AllocNode(&node)) {
494             code = SYNC_DENIED;
495             res->hdr->reason = SYNC_REASON_NOMEM;
496             goto done;
497         }
498         clone = node;
499         hash = 1;
500     }
501
502     HandlePrio(clone, node, com->sop->prio);
503
504     switch (node->state) {
505     case SALVSYNC_STATE_QUEUED:
506         UpdateCommandPrio(node);
507         break;
508
509     case SALVSYNC_STATE_ERROR:
510     case SALVSYNC_STATE_DONE:
511     case SALVSYNC_STATE_UNKNOWN:
512         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
513         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
514
515         /* 
516          * make sure volgroup parent partition path is kept coherent
517          *
518          * If we ever want to support non-COW clones on a machine holding
519          * the RW site, please note that this code does not work under the
520          * conditions where someone zaps a COW clone on partition X, and
521          * subsequently creates a full clone on partition Y -- we'd need
522          * an inverse to SALVSYNC_com_Link.
523          *  -- tkeiser 11/28/2007
524          */
525         strcpy(node->command.sop.partName, com->sop->partName);
526
527         if (AddToSalvageQueue(node)) {
528             code = SYNC_DENIED;
529         }
530         break;
531
532     default:
533         break;
534     }
535
536     if (hash) {
537         AddNodeToHash(node);
538     }
539
540     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
541     res->sop->state = node->state;
542     res->sop->prio = node->command.sop.prio;
543
544  done:
545     return code;
546 }
547
548 /**
549  * cancel a pending salvage request.
550  *
551  * @param[in]  com  inbound command object
552  * @param[out] res  outbound response object
553  *
554  * @return operation status
555  *    @retval SYNC_OK success
556  *    @retval SYNC_FAILED malformed command packet
557  *
558  * @note this is a SALVSYNC protocol rpc handler
559  *
560  * @internal
561  */
562 static afs_int32
563 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
564 {
565     afs_int32 code = SYNC_OK;
566     struct SalvageQueueNode * node;
567
568     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
569         code = SYNC_FAILED;
570         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
571         goto done;
572     }
573
574     node = LookupNodeByCommand(com->sop, NULL);
575
576     if (node == NULL) {
577         res->sop->state = SALVSYNC_STATE_UNKNOWN;
578         res->sop->prio = 0;
579     } else {
580         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
581         res->sop->prio = node->command.sop.prio;
582         res->sop->state = node->state;
583         if ((node->type == SALVSYNC_VOLGROUP_PARENT) && 
584             (node->state == SALVSYNC_STATE_QUEUED)) {
585             DeleteFromSalvageQueue(node);
586         }
587     }
588
589  done:
590     return code;
591 }
592
593 /**
594  * cancel all pending salvage requests.
595  *
596  * @param[in]  com  incoming command object
597  * @param[out] res  outbound response object
598  *
599  * @return operation status
600  *    @retval SYNC_OK success
601  *
602  * @note this is a SALVSYNC protocol rpc handler
603  *
604  * @internal
605  */
606 static afs_int32
607 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
608 {
609     struct SalvageQueueNode * np, *nnp;
610     struct DiskPartition64 * dp;
611
612     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
613         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
614             DeleteFromSalvageQueue(np);
615         }
616     }
617
618     return SYNC_OK;
619 }
620
621 /**
622  * link a queue node for a clone to its parent volume.
623  *
624  * @param[in]  com   inbound command object
625  * @param[out] res   outbound response object
626  *
627  * @return operation status
628  *    @retval SYNC_OK success
629  *    @retval SYNC_FAILED malformed command packet
630  *    @retval SYNC_DENIED the request could not be completed
631  *
632  * @note this is a SALVSYNC protocol rpc handler
633  *
634  * @post the requested volume is marked as a child of another volume.
635  *       thus, future salvage requests for this volume will result in the
636  *       parent of the volume group being scheduled for salvage instead
637  *       of this clone.
638  *
639  * @internal
640  */
641 static afs_int32
642 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
643 {
644     afs_int32 code = SYNC_OK;
645     struct SalvageQueueNode * clone, * parent;
646
647     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
648         code = SYNC_FAILED;
649         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
650         goto done;
651     }
652
653     /* lookup clone's salvage scheduling node */
654     clone = LookupNodeByCommand(com->sop, NULL);
655     if (clone == NULL) {
656         code = SYNC_DENIED;
657         res->hdr->reason = SALVSYNC_REASON_ERROR;
658         goto done;
659     }
660
661     /* lookup parent's salvage scheduling node */
662     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
663     if (parent == NULL) {
664         if (AllocNode(&parent)) {
665             code = SYNC_DENIED;
666             res->hdr->reason = SYNC_REASON_NOMEM;
667             goto done;
668         }
669         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
670         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
671         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
672         AddNodeToHash(parent);
673     }
674
675     if (LinkNode(parent, clone)) {
676         code = SYNC_DENIED;
677         goto done;
678     }
679
680  done:
681     return code;
682 }
683
684 /**
685  * query the status of a volume salvage request.
686  *
687  * @param[in]  com   inbound command object
688  * @param[out] res   outbound response object
689  *
690  * @return operation status
691  *    @retval SYNC_OK success
692  *    @retval SYNC_FAILED malformed command packet
693  *
694  * @note this is a SALVSYNC protocol rpc handler
695  *
696  * @internal
697  */
698 static afs_int32
699 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
700 {
701     afs_int32 code = SYNC_OK;
702     struct SalvageQueueNode * node;
703
704     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
705         code = SYNC_FAILED;
706         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
707         goto done;
708     }
709
710     LookupNodeByCommand(com->sop, &node);
711
712     /* query whether a volume is done salvaging */
713     if (node == NULL) {
714         res->sop->state = SALVSYNC_STATE_UNKNOWN;
715         res->sop->prio = 0;
716     } else {
717         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
718         res->sop->state = node->state;
719         res->sop->prio = node->command.sop.prio;
720     }
721
722  done:
723     return code;
724 }
725
726 static void
727 SALVSYNC_Drop(int fd)
728 {
729     RemoveHandler(fd);
730 #ifdef AFS_NT40_ENV
731     closesocket(fd);
732 #else
733     close(fd);
734 #endif
735     AcceptOn();
736 }
737
738 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
739
740 static void
741 AcceptOn(void)
742 {
743     if (AcceptHandler == -1) {
744         assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
745         AcceptHandler = FindHandler(salvsync_server_state.fd);
746     }
747 }
748
749 static void
750 AcceptOff(void)
751 {
752     if (AcceptHandler != -1) {
753         assert(RemoveHandler(salvsync_server_state.fd));
754         AcceptHandler = -1;
755     }
756 }
757
758 /* The multiple FD handling code. */
759
760 static int HandlerFD[MAXHANDLERS];
761 static void (*HandlerProc[MAXHANDLERS]) (int);
762
763 static void
764 InitHandler(void)
765 {
766     register int i;
767     ObtainWriteLock(&SALVSYNC_handler_lock);
768     for (i = 0; i < MAXHANDLERS; i++) {
769         HandlerFD[i] = -1;
770         HandlerProc[i] = NULL;
771     }
772     ReleaseWriteLock(&SALVSYNC_handler_lock);
773 }
774
775 static void
776 CallHandler(fd_set * fdsetp)
777 {
778     register int i;
779     ObtainReadLock(&SALVSYNC_handler_lock);
780     for (i = 0; i < MAXHANDLERS; i++) {
781         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
782             ReleaseReadLock(&SALVSYNC_handler_lock);
783             (*HandlerProc[i]) (HandlerFD[i]);
784             ObtainReadLock(&SALVSYNC_handler_lock);
785         }
786     }
787     ReleaseReadLock(&SALVSYNC_handler_lock);
788 }
789
790 static int
791 AddHandler(int afd, void (*aproc) (int))
792 {
793     register int i;
794     ObtainWriteLock(&SALVSYNC_handler_lock);
795     for (i = 0; i < MAXHANDLERS; i++)
796         if (HandlerFD[i] == -1)
797             break;
798     if (i >= MAXHANDLERS) {
799         ReleaseWriteLock(&SALVSYNC_handler_lock);
800         return 0;
801     }
802     HandlerFD[i] = afd;
803     HandlerProc[i] = aproc;
804     ReleaseWriteLock(&SALVSYNC_handler_lock);
805     return 1;
806 }
807
808 static int
809 FindHandler(register int afd)
810 {
811     register int i;
812     ObtainReadLock(&SALVSYNC_handler_lock);
813     for (i = 0; i < MAXHANDLERS; i++)
814         if (HandlerFD[i] == afd) {
815             ReleaseReadLock(&SALVSYNC_handler_lock);
816             return i;
817         }
818     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
819     assert(1 == 2);
820     return -1;                  /* satisfy compiler */
821 }
822
823 static int
824 FindHandler_r(register int afd)
825 {
826     register int i;
827     for (i = 0; i < MAXHANDLERS; i++)
828         if (HandlerFD[i] == afd) {
829             return i;
830         }
831     assert(1 == 2);
832     return -1;                  /* satisfy compiler */
833 }
834
835 static int
836 RemoveHandler(register int afd)
837 {
838     ObtainWriteLock(&SALVSYNC_handler_lock);
839     HandlerFD[FindHandler_r(afd)] = -1;
840     ReleaseWriteLock(&SALVSYNC_handler_lock);
841     return 1;
842 }
843
844 static void
845 GetHandler(fd_set * fdsetp, int *maxfdp)
846 {
847     register int i;
848     register int maxfd = -1;
849     FD_ZERO(fdsetp);
850     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
851     for (i = 0; i < MAXHANDLERS; i++)
852         if (HandlerFD[i] != -1) {
853             FD_SET(HandlerFD[i], fdsetp);
854             if (maxfd < HandlerFD[i])
855                 maxfd = HandlerFD[i];
856         }
857     *maxfdp = maxfd;
858     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
859 }
860
861 /**
862  * allocate a salvage queue node.
863  *
864  * @param[out] node_out  address in which to store new node pointer
865  *
866  * @return operation status
867  *    @retval 0 success
868  *    @retval 1 failed to allocate node
869  *
870  * @internal
871  */
872 static int
873 AllocNode(struct SalvageQueueNode ** node_out)
874 {
875     int code = 0;
876     struct SalvageQueueNode * node;
877
878     *node_out = node = (struct SalvageQueueNode *) 
879         malloc(sizeof(struct SalvageQueueNode));
880     if (node == NULL) {
881         code = 1;
882         goto done;
883     }
884
885     memset(node, 0, sizeof(struct SalvageQueueNode));
886     node->type = SALVSYNC_VOLGROUP_PARENT;
887     node->state = SALVSYNC_STATE_UNKNOWN;
888
889  done:
890     return code;
891 }
892
893 /**
894  * link a salvage queue node to its parent.
895  *
896  * @param[in] parent  pointer to queue node for parent of volume group
897  * @param[in] clone   pointer to queue node for a clone
898  *
899  * @return operation status
900  *    @retval 0 success
901  *    @retval 1 failure
902  *
903  * @internal
904  */
905 static int
906 LinkNode(struct SalvageQueueNode * parent,
907          struct SalvageQueueNode * clone)
908 {
909     int code = 0;
910     int idx;
911
912     /* check for attaching a clone to a clone */
913     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
914         code = 1;
915         goto done;
916     }
917
918     /* check for pre-existing registration and openings */
919     for (idx = 0; idx < VOLMAXTYPES; idx++) {
920         if (parent->volgroup.children[idx] == clone) {
921             goto linked;
922         }
923         if (parent->volgroup.children[idx] == NULL) {
924             break;
925         }
926     }
927     if (idx == VOLMAXTYPES) {
928         code = 1;
929         goto done;
930     }
931
932     /* link parent and child */
933     parent->volgroup.children[idx] = clone;
934     clone->type = SALVSYNC_VOLGROUP_CLONE;
935     clone->volgroup.parent = parent;
936
937
938  linked:
939     switch (clone->state) {
940     case SALVSYNC_STATE_QUEUED:
941         DeleteFromSalvageQueue(clone);
942
943     case SALVSYNC_STATE_SALVAGING:
944         switch (parent->state) {
945         case SALVSYNC_STATE_UNKNOWN:
946         case SALVSYNC_STATE_ERROR:
947         case SALVSYNC_STATE_DONE:
948             parent->command.sop.prio = clone->command.sop.prio;
949             AddToSalvageQueue(parent);
950             break;
951
952         case SALVSYNC_STATE_QUEUED:
953             if (clone->command.sop.prio) {
954                 parent->command.sop.prio += clone->command.sop.prio;
955                 UpdateCommandPrio(parent);
956             }
957             break;
958
959         default:
960             break;
961         }
962         break;
963
964     default:
965         break;
966     }
967
968  done:
969     return code;
970 }
971
972 static void
973 HandlePrio(struct SalvageQueueNode * clone, 
974            struct SalvageQueueNode * node,
975            afs_uint32 new_prio)
976 {
977     afs_uint32 delta;
978
979     switch (node->state) {
980     case SALVSYNC_STATE_ERROR:
981     case SALVSYNC_STATE_DONE:
982     case SALVSYNC_STATE_UNKNOWN:
983         node->command.sop.prio = 0;
984         break;
985     }
986
987     if (new_prio < clone->command.sop.prio) {
988         /* strange. let's just set our delta to 1 */
989         delta = 1;
990     } else {
991         delta = new_prio - clone->command.sop.prio;
992     }
993
994     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
995         clone->command.sop.prio = new_prio;
996     }
997
998     node->command.sop.prio += delta;
999 }
1000
1001 static int
1002 AddToSalvageQueue(struct SalvageQueueNode * node)
1003 {
1004     afs_int32 id;
1005     struct SalvageQueueNode * last = NULL;
1006
1007     id = volutil_GetPartitionID(node->command.sop.partName);
1008     if (id < 0 || id > VOLMAXPARTS) {
1009         return 1;
1010     }
1011     if (!VGetPartitionById_r(id, 0)) {
1012         /* don't enqueue salvage requests for unmounted partitions */
1013         return 1;
1014     }
1015     if (queue_IsOnQueue(node)) {
1016         return 0;
1017     }
1018
1019     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1020         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1021     }
1022     queue_Append(&salvageQueue.part[id], node);
1023     salvageQueue.len[id]++;
1024     salvageQueue.total_len++;
1025     salvageQueue.last_insert = id;
1026     node->partition_id = id;
1027     node->state = SALVSYNC_STATE_QUEUED;
1028
1029     /* reorder, if necessary */
1030     if (last && last->command.sop.prio < node->command.sop.prio) {
1031         UpdateCommandPrio(node);
1032     }
1033
1034     assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1035     return 0;
1036 }
1037
1038 static void
1039 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1040 {
1041     if (queue_IsOnQueue(node)) {
1042         queue_Remove(node);
1043         salvageQueue.len[node->partition_id]--;
1044         salvageQueue.total_len--;
1045         node->state = SALVSYNC_STATE_UNKNOWN;
1046         assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1047     }
1048 }
1049
1050 static void
1051 AddToPendingQueue(struct SalvageQueueNode * node)
1052 {
1053     queue_Append(&pendingQueue, node);
1054     pendingQueue.len++;
1055     node->state = SALVSYNC_STATE_SALVAGING;
1056     assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1057 }
1058
1059 static void
1060 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1061 {
1062     if (queue_IsOnQueue(node)) {
1063         queue_Remove(node);
1064         pendingQueue.len--;
1065         node->state = SALVSYNC_STATE_UNKNOWN;
1066         assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1067     }
1068 }
1069
1070 static struct SalvageQueueNode *
1071 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1072 {
1073     struct SalvageQueueNode * np, * nnp;
1074
1075     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1076         if ((np->command.sop.volume == qry->volume) && 
1077             !strncmp(np->command.sop.partName, qry->partName,
1078                      sizeof(qry->partName)))
1079             break;
1080     }
1081
1082     if (queue_IsEnd(&pendingQueue, np))
1083         np = NULL;
1084     return np;
1085 }
1086
1087 static struct SalvageQueueNode *
1088 LookupPendingCommandByPid(int pid)
1089 {
1090     struct SalvageQueueNode * np, * nnp;
1091
1092     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1093         if (np->pid == pid)
1094             break;
1095     }
1096
1097     if (queue_IsEnd(&pendingQueue, np))
1098         np = NULL;
1099     return np;
1100 }
1101
1102
1103 /* raise the priority of a previously scheduled salvage */
1104 static void
1105 UpdateCommandPrio(struct SalvageQueueNode * node)
1106 {
1107     struct SalvageQueueNode *np, *nnp;
1108     afs_int32 id;
1109     afs_uint32 prio;
1110
1111     assert(queue_IsOnQueue(node));
1112
1113     prio = node->command.sop.prio;
1114     id = node->partition_id;
1115     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1116         queue_Remove(node);
1117         queue_Prepend(&salvageQueue.part[id], node);
1118     } else {
1119         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1120             if (np->command.sop.prio > prio)
1121                 break;
1122         }
1123         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1124             queue_Remove(node);
1125             queue_Prepend(&salvageQueue.part[id], node);
1126         } else if (node != np) {
1127             queue_Remove(node);
1128             queue_InsertAfter(np, node);
1129         }
1130     }
1131 }
1132
1133 /* this will need to be rearchitected if we ever want more than one thread
1134  * to wait for new salvage nodes */
1135 struct SalvageQueueNode * 
1136 SALVSYNC_getWork(void)
1137 {
1138     int i, ret;
1139     struct DiskPartition64 * dp = NULL, * fdp;
1140     static afs_int32 next_part_sched = 0;
1141     struct SalvageQueueNode *node = NULL, *np;
1142
1143     VOL_LOCK;
1144
1145     /*
1146      * wait for work to be scheduled
1147      * if there are no disk partitions, just sit in this wait loop forever
1148      */
1149     while (!salvageQueue.total_len || !DiskPartitionList) {
1150         VOL_CV_WAIT(&salvageQueue.cv);
1151     }
1152
1153     /* 
1154      * short circuit for simple case where only one partition has
1155      * scheduled salvages
1156      */
1157     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1158         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1159         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1160         goto have_node;
1161     }
1162
1163
1164     /* 
1165      * ok, more than one partition has scheduled salvages.
1166      * now search for partitions with scheduled salvages, but no pending salvages. 
1167      */
1168     dp = VGetPartitionById_r(next_part_sched, 0);
1169     if (!dp) {
1170         dp = DiskPartitionList;
1171     }
1172     fdp = dp;
1173
1174     for (i=0 ; 
1175          !i || dp != fdp ; 
1176          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1177         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1178             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1179             goto have_node;
1180         }
1181     }
1182
1183
1184     /*
1185      * all partitions with scheduled salvages have at least one pending.
1186      * now do an exhaustive search for a scheduled salvage.
1187      */
1188     dp = fdp;
1189
1190     for (i=0 ; 
1191          !i || dp != fdp ; 
1192          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1193         if (salvageQueue.len[dp->index]) {
1194             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1195             goto have_node;
1196         }
1197     }
1198
1199     /* we should never reach this line */
1200     assert(1==2);
1201
1202  have_node:
1203     assert(node != NULL);
1204     node->pid = 0;
1205     partition_salvaging[node->partition_id]++;
1206     DeleteFromSalvageQueue(node);
1207     AddToPendingQueue(node);
1208
1209     if (dp) {
1210         /* update next_part_sched field */
1211         if (dp->next) {
1212             next_part_sched = dp->next->index;
1213         } else if (DiskPartitionList) {
1214             next_part_sched = DiskPartitionList->index;
1215         } else {
1216             next_part_sched = -1;
1217         }
1218     }
1219
1220  bail:
1221     VOL_UNLOCK;
1222     return node;
1223 }
1224
1225 /**
1226  * update internal scheduler state to reflect completion of a work unit.
1227  *
1228  * @param[in]  node    salvage queue node object pointer
1229  * @param[in]  result  worker process result code
1230  *
1231  * @post scheduler state is updated.
1232  *
1233  * @internal
1234  */
1235 static void
1236 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1237 {
1238     afs_int32 partid;
1239     int idx;
1240
1241     DeleteFromPendingQueue(node);
1242     partid = node->partition_id;
1243     if (partid >=0 && partid <= VOLMAXPARTS) {
1244         partition_salvaging[partid]--;
1245     }
1246     if (result == 0) {
1247         node->state = SALVSYNC_STATE_DONE;
1248     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1249         node->state = SALVSYNC_STATE_ERROR;
1250     }
1251
1252     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1253         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1254             if (node->volgroup.children[idx]) {
1255                 node->volgroup.children[idx]->state = node->state;
1256             }
1257         }
1258     }
1259 }
1260
1261 /**
1262  * check whether worker child failed.
1263  *
1264  * @param[in] status  status bitfield return by wait()
1265  *
1266  * @return boolean failure code
1267  *    @retval 0 child succeeded
1268  *    @retval 1 child failed
1269  *
1270  * @internal
1271  */
1272 static int
1273 ChildFailed(int status)
1274 {
1275     return (WCOREDUMP(status) || 
1276             WIFSIGNALED(status) || 
1277             ((WEXITSTATUS(status) != 0) && 
1278              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1279 }
1280
1281
1282 /**
1283  * notify salvsync scheduler of node completion, by child pid.
1284  *
1285  * @param[in]  pid     pid of worker child
1286  * @param[in]  status  worker status bitfield from wait()
1287  *
1288  * @post scheduler state is updated.
1289  *       if status code is a failure, fileserver notification was attempted
1290  *
1291  * @see SALVSYNC_doneWork_r
1292  */
1293 void
1294 SALVSYNC_doneWorkByPid(int pid, int status)
1295 {
1296     struct SalvageQueueNode * node;
1297     char partName[16];
1298     afs_uint32 volids[VOLMAXTYPES+1];
1299     unsigned int idx;
1300
1301     memset(volids, 0, sizeof(volids));
1302
1303     VOL_LOCK;
1304     node = LookupPendingCommandByPid(pid);
1305     if (node != NULL) {
1306         SALVSYNC_doneWork_r(node, status);
1307
1308         if (ChildFailed(status)) {
1309             /* populate volume id list for later processing outside the glock */
1310             volids[0] = node->command.sop.volume;
1311             strcpy(partName, node->command.sop.partName);
1312             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1313                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1314                     if (node->volgroup.children[idx]) {
1315                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1316                     }
1317                 }
1318             }
1319         }
1320     }
1321     VOL_UNLOCK;
1322
1323     /*
1324      * if necessary, notify fileserver of
1325      * failure to salvage volume group
1326      * [we cannot guarantee that the child made the
1327      *  appropriate notifications (e.g. SIGSEGV)]
1328      *  -- tkeiser 11/28/2007
1329      */
1330     if (ChildFailed(status)) {
1331         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1332             if (volids[idx]) {
1333                 FSYNC_VolOp(volids[idx],
1334                             partName,
1335                             FSYNC_VOL_FORCE_ERROR,
1336                             FSYNC_WHATEVER,
1337                             NULL);
1338             }
1339         }
1340     }
1341 }
1342
1343 #endif /* AFS_DEMAND_ATTACH_FS */