6de28b87918107321c39cd7f716ff569784f8b3c
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29 RCSID
30     ("$Header$");
31
32 #include <sys/types.h>
33 #include <stdio.h>
34 #ifdef AFS_NT40_ENV
35 #include <winsock2.h>
36 #include <time.h>
37 #else
38 #include <sys/param.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <netdb.h>
42 #include <sys/time.h>
43 #endif
44 #include <errno.h>
45 #include <assert.h>
46 #include <signal.h>
47 #include <string.h>
48
49
50 #include <rx/xdr.h>
51 #include <afs/afsint.h>
52 #include "nfs.h"
53 #include <afs/errors.h>
54 #include "salvsync.h"
55 #include "lwp.h"
56 #include "lock.h"
57 #include <afs/afssyscalls.h>
58 #include "ihandle.h"
59 #include "vnode.h"
60 #include "volume.h"
61 #include "partition.h"
62 #include <rx/rx_queue.h>
63 #include <afs/procmgmt.h>
64
65 #if !defined(offsetof)
66 #include <stddef.h>
67 #endif
68
69 #ifdef USE_UNIX_SOCKETS
70 #include <afs/afsutil.h>
71 #include <sys/un.h>
72 #endif
73
74
75 /*@printflike@*/ extern void Log(const char *format, ...);
76
77 #ifdef osi_Assert
78 #undef osi_Assert
79 #endif
80 #define osi_Assert(e) (void)(e)
81
82 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
83                                  * move = dump+restore can run on single server */
84
85 /* Forward declarations */
86 static void * SALVSYNC_syncThread(void *);
87 static void SALVSYNC_newconnection(int fd);
88 static void SALVSYNC_com(int fd);
89 static void SALVSYNC_Drop(int fd);
90 static void AcceptOn(void);
91 static void AcceptOff(void);
92 static void InitHandler(void);
93 static void CallHandler(fd_set * fdsetp);
94 static int AddHandler(int afd, void (*aproc) (int));
95 static int FindHandler(register int afd);
96 static int FindHandler_r(register int afd);
97 static int RemoveHandler(register int afd);
98 static void GetHandler(fd_set * fdsetp, int *maxfdp);
99
100
101 /*
102  * This lock controls access to the handler array.
103  */
104 struct Lock SALVSYNC_handler_lock;
105
106
107 #ifdef AFS_DEMAND_ATTACH_FS
108 /*
109  * SALVSYNC is a feature specific to the demand attach fileserver
110  */
111
112 static int AllocNode(struct SalvageQueueNode ** node);
113
114 static int AddToSalvageQueue(struct SalvageQueueNode * node);
115 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
116 static void AddToPendingQueue(struct SalvageQueueNode * node);
117 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
118 static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
119 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
120 static void UpdateCommandPrio(struct SalvageQueueNode * node);
121 static void HandlePrio(struct SalvageQueueNode * clone, 
122                        struct SalvageQueueNode * parent,
123                        afs_uint32 new_prio);
124
125 static int LinkNode(struct SalvageQueueNode * parent,
126                     struct SalvageQueueNode * clone);
127
128 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName, 
129                                             struct SalvageQueueNode ** parent);
130 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
131                                                      struct SalvageQueueNode ** parent);
132 static void AddNodeToHash(struct SalvageQueueNode * node);
133 static void DeleteNodeFromHash(struct SalvageQueueNode * node);
134
135 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
136 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
137 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
138 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
139 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
140
141
142 extern int LogLevel;
143 extern int VInit;
144 extern pthread_mutex_t vol_salvsync_mutex;
145
146 /**
147  * salvsync server socket handle.
148  */
149 static SYNC_server_state_t salvsync_server_state = 
150     { -1,                       /* file descriptor */
151       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
152       SALVSYNC_PROTO_VERSION,   /* protocol version */
153       5,                        /* bind() retry limit */
154       100,                      /* listen() queue depth */
155       "SALVSYNC",               /* protocol name string */
156     };
157
158
159 /**
160  * queue of all volumes waiting to be salvaged.
161  */
162 struct SalvageQueue {
163     volatile int total_len;
164     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
165     volatile int len[VOLMAXPARTS+1];
166     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
167     pthread_cond_t cv;
168 };
169 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
170
171 /**
172  * queue of all volumes currently being salvaged.
173  */
174 struct QueueHead {
175     volatile struct rx_queue q;  /**< queue of salvages in progress */
176     volatile int len;            /**< length of in-progress queue */
177     pthread_cond_t queue_change_cv;
178 };
179 static struct QueueHead pendingQueue;  /* volumes being salvaged */
180
181 /* XXX
182  * whether a partition has a salvage in progress
183  *
184  * the salvager code only permits one salvage per partition at a time
185  *
186  * the following hack tries to keep salvaged parallelism high by
187  * only permitting one salvage dispatch per partition at a time
188  *
189  * unfortunately, the parallel salvager currently
190  * has a rather braindead routine that won't permit
191  * multiple salvages on the same "device".  this
192  * function happens to break pretty badly on lvm, raid luns, etc.
193  *
194  * this hack isn't good enough to stop the device limiting code from
195  * crippling performance.  someday that code needs to be rewritten
196  */
197 static int partition_salvaging[VOLMAXPARTS+1];
198
199 #define VSHASH_SIZE 64
200 #define VSHASH_MASK (VSHASH_SIZE-1)
201 #define VSHASH(vid) ((vid)&VSHASH_MASK)
202
203 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
204
205 static struct SalvageQueueNode *
206 LookupNode(afs_uint32 vid, char * partName,
207            struct SalvageQueueNode ** parent)
208 {
209     struct rx_queue *qp, *nqp;
210     struct SalvageQueueNode *vsp;
211     int idx = VSHASH(vid);
212
213     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
214         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
215         if ((vsp->command.sop.volume == vid) &&
216             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
217             break;
218         }
219     }
220
221     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
222         vsp = NULL;
223     }
224
225     if (parent) {
226         if (vsp) {
227             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
228                 vsp->volgroup.parent : vsp;
229         } else {
230             *parent = NULL;
231         }
232     }
233
234     return vsp;
235 }
236
237 static struct SalvageQueueNode *
238 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
239                     struct SalvageQueueNode ** parent)
240 {
241     return LookupNode(qry->volume, qry->partName, parent);
242 }
243
244 static void
245 AddNodeToHash(struct SalvageQueueNode * node)
246 {
247     int idx = VSHASH(node->command.sop.volume);
248
249     if (queue_IsOnQueue(&node->hash_chain)) {
250         return;
251     }
252
253     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
254     SalvageHashTable[idx].len++;
255 }
256
257 static void
258 DeleteNodeFromHash(struct SalvageQueueNode * node)
259 {
260     int idx = VSHASH(node->command.sop.volume);
261
262     if (queue_IsNotOnQueue(&node->hash_chain)) {
263         return;
264     }
265
266     queue_Remove(&node->hash_chain);
267     SalvageHashTable[idx].len--;
268 }
269
270 void
271 SALVSYNC_salvInit(void)
272 {
273     int i;
274     pthread_t tid;
275     pthread_attr_t tattr;
276
277     /* initialize the queues */
278     assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
279     for (i = 0; i <= VOLMAXPARTS; i++) {
280         queue_Init(&salvageQueue.part[i]);
281         salvageQueue.len[i] = 0;
282     }
283     assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
284     queue_Init(&pendingQueue);
285     salvageQueue.total_len = pendingQueue.len = 0;
286     salvageQueue.last_insert = -1;
287     memset(partition_salvaging, 0, sizeof(partition_salvaging));
288
289     for (i = 0; i < VSHASH_SIZE; i++) {
290         assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
291         SalvageHashTable[i].len = 0;
292         queue_Init(&SalvageHashTable[i]);
293     }
294
295     /* start the salvsync thread */
296     assert(pthread_attr_init(&tattr) == 0);
297     assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
298     assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
299 }
300
301
302 static fd_set SALVSYNC_readfds;
303
304 static void *
305 SALVSYNC_syncThread(void * args)
306 {
307     int on = 1;
308     int code;
309     int numTries;
310     int tid;
311     SYNC_server_state_t * state = &salvsync_server_state;
312
313     SYNC_getAddr(&state->endpoint, &state->addr);
314     SYNC_cleanupSock(state);
315
316 #ifndef AFS_NT40_ENV
317     (void)signal(SIGPIPE, SIG_IGN);
318 #endif
319
320     state->fd = SYNC_getSock(&state->endpoint);
321     code = SYNC_bindSock(state);
322     assert(!code);
323
324     InitHandler();
325     AcceptOn();
326
327     for (;;) {
328         int maxfd;
329         GetHandler(&SALVSYNC_readfds, &maxfd);
330         /* Note: check for >= 1 below is essential since IOMGR_select
331          * doesn't have exactly same semantics as select.
332          */
333         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
334             CallHandler(&SALVSYNC_readfds);
335     }
336
337     return NULL;
338 }
339
340 static void
341 SALVSYNC_newconnection(int afd)
342 {
343 #ifdef USE_UNIX_SOCKETS
344     struct sockaddr_un other;
345 #else  /* USE_UNIX_SOCKETS */
346     struct sockaddr_in other;
347 #endif
348     int junk, fd;
349     junk = sizeof(other);
350     fd = accept(afd, (struct sockaddr *)&other, &junk);
351     if (fd == -1) {
352         Log("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
353         assert(1 == 2);
354     } else if (!AddHandler(fd, SALVSYNC_com)) {
355         AcceptOff();
356         assert(AddHandler(fd, SALVSYNC_com));
357     }
358 }
359
360 /* this function processes commands from an salvsync file descriptor (fd) */
361 static afs_int32 SALV_cnt = 0;
362 static void
363 SALVSYNC_com(int fd)
364 {
365     SYNC_command com;
366     SYNC_response res;
367     SALVSYNC_response_hdr sres_hdr;
368     SALVSYNC_command scom;
369     SALVSYNC_response sres;
370     SYNC_PROTO_BUF_DECL(buf);
371     
372     com.payload.buf = (void *)buf;
373     com.payload.len = SYNC_PROTO_MAX_LEN;
374     res.payload.buf = (void *) &sres_hdr;
375     res.payload.len = sizeof(sres_hdr);
376     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
377     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
378
379     scom.hdr = &com.hdr;
380     scom.sop = (SALVSYNC_command_hdr *) buf;
381     scom.com = &com;
382     sres.hdr = &res.hdr;
383     sres.sop = &sres_hdr;
384     sres.res = &res;
385
386     SALV_cnt++;
387     if (SYNC_getCom(fd, &com)) {
388         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
389         SALVSYNC_Drop(fd);
390         return;
391     }
392
393     if (com.recv_len < sizeof(com.hdr)) {
394         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
395         res.hdr.response = SYNC_COM_ERROR;
396         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
397         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
398         goto respond;
399     }
400
401     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
402         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
403         res.hdr.response = SYNC_COM_ERROR;
404         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
405         goto respond;
406     }
407
408     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
409         res.hdr.response = SYNC_OK;
410         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
411         goto respond;
412     }
413
414     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
415         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
416         res.hdr.response = SYNC_COM_ERROR;
417         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
418         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
419         goto respond;
420     }
421
422     VOL_LOCK;
423     switch (com.hdr.command) {
424     case SALVSYNC_NOP:
425         break;
426     case SALVSYNC_SALVAGE:
427     case SALVSYNC_RAISEPRIO:
428         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
429         break;
430     case SALVSYNC_CANCEL:
431         /* cancel a salvage */
432         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
433         break;
434     case SALVSYNC_CANCELALL:
435         /* cancel all queued salvages */
436         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
437         break;
438     case SALVSYNC_QUERY:
439         /* query whether a volume is done salvaging */
440         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
441         break;
442     case SALVSYNC_OP_LINK:
443         /* link a clone to its parent in the scheduler */
444         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
445         break;
446     default:
447         res.hdr.response = SYNC_BAD_COMMAND;
448         break;
449     }
450
451     sres_hdr.sq_len = salvageQueue.total_len;
452     sres_hdr.pq_len = pendingQueue.len;
453     VOL_UNLOCK;
454
455  respond:
456     SYNC_putRes(fd, &res);
457     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
458         SALVSYNC_Drop(fd);
459     }
460 }
461
462 /**
463  * request that a volume be salvaged.
464  *
465  * @param[in]  com  inbound command object
466  * @param[out] res  outbound response object
467  *
468  * @return operation status
469  *    @retval SYNC_OK success
470  *    @retval SYNC_DENIED failed to enqueue request
471  *    @retval SYNC_FAILED malformed command packet
472  *
473  * @note this is a SALVSYNC protocol rpc handler
474  *
475  * @internal
476  *
477  * @post the volume is enqueued in the to-be-salvaged queue.  
478  *       if the volume was already in the salvage queue, its 
479  *       priority (and thus its location in the queue) are 
480  *       updated.
481  */
482 static afs_int32
483 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
484 {
485     afs_int32 code = SYNC_OK;
486     struct SalvageQueueNode * node, * clone;
487     int hash = 0;
488
489     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
490         code = SYNC_FAILED;
491         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
492         goto done;
493     }
494
495     clone = LookupNodeByCommand(com->sop, &node);
496
497     if (node == NULL) {
498         if (AllocNode(&node)) {
499             code = SYNC_DENIED;
500             res->hdr->reason = SYNC_REASON_NOMEM;
501             goto done;
502         }
503         clone = node;
504         hash = 1;
505     }
506
507     HandlePrio(clone, node, com->sop->prio);
508
509     switch (node->state) {
510     case SALVSYNC_STATE_QUEUED:
511         UpdateCommandPrio(node);
512         break;
513
514     case SALVSYNC_STATE_ERROR:
515     case SALVSYNC_STATE_DONE:
516     case SALVSYNC_STATE_UNKNOWN:
517         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
518         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
519
520         /* 
521          * make sure volgroup parent partition path is kept coherent
522          *
523          * If we ever want to support non-COW clones on a machine holding
524          * the RW site, please note that this code does not work under the
525          * conditions where someone zaps a COW clone on partition X, and
526          * subsequently creates a full clone on partition Y -- we'd need
527          * an inverse to SALVSYNC_com_Link.
528          *  -- tkeiser 11/28/2007
529          */
530         strcpy(node->command.sop.partName, com->sop->partName);
531
532         if (AddToSalvageQueue(node)) {
533             code = SYNC_DENIED;
534         }
535         break;
536
537     default:
538         break;
539     }
540
541     if (hash) {
542         AddNodeToHash(node);
543     }
544
545     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
546     res->sop->state = node->state;
547     res->sop->prio = node->command.sop.prio;
548
549  done:
550     return code;
551 }
552
553 /**
554  * cancel a pending salvage request.
555  *
556  * @param[in]  com  inbound command object
557  * @param[out] res  outbound response object
558  *
559  * @return operation status
560  *    @retval SYNC_OK success
561  *    @retval SYNC_FAILED malformed command packet
562  *
563  * @note this is a SALVSYNC protocol rpc handler
564  *
565  * @internal
566  */
567 static afs_int32
568 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
569 {
570     afs_int32 code = SYNC_OK;
571     struct SalvageQueueNode * node;
572
573     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
574         code = SYNC_FAILED;
575         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
576         goto done;
577     }
578
579     node = LookupNodeByCommand(com->sop, NULL);
580
581     if (node == NULL) {
582         res->sop->state = SALVSYNC_STATE_UNKNOWN;
583         res->sop->prio = 0;
584     } else {
585         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
586         res->sop->prio = node->command.sop.prio;
587         res->sop->state = node->state;
588         if ((node->type == SALVSYNC_VOLGROUP_PARENT) && 
589             (node->state == SALVSYNC_STATE_QUEUED)) {
590             DeleteFromSalvageQueue(node);
591         }
592     }
593
594  done:
595     return code;
596 }
597
598 /**
599  * cancel all pending salvage requests.
600  *
601  * @param[in]  com  incoming command object
602  * @param[out] res  outbound response object
603  *
604  * @return operation status
605  *    @retval SYNC_OK success
606  *
607  * @note this is a SALVSYNC protocol rpc handler
608  *
609  * @internal
610  */
611 static afs_int32
612 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
613 {
614     struct SalvageQueueNode * np, *nnp;
615     struct DiskPartition64 * dp;
616
617     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
618         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
619             DeleteFromSalvageQueue(np);
620         }
621     }
622
623     return SYNC_OK;
624 }
625
626 /**
627  * link a queue node for a clone to its parent volume.
628  *
629  * @param[in]  com   inbound command object
630  * @param[out] res   outbound response object
631  *
632  * @return operation status
633  *    @retval SYNC_OK success
634  *    @retval SYNC_FAILED malformed command packet
635  *    @retval SYNC_DENIED the request could not be completed
636  *
637  * @note this is a SALVSYNC protocol rpc handler
638  *
639  * @post the requested volume is marked as a child of another volume.
640  *       thus, future salvage requests for this volume will result in the
641  *       parent of the volume group being scheduled for salvage instead
642  *       of this clone.
643  *
644  * @internal
645  */
646 static afs_int32
647 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
648 {
649     afs_int32 code = SYNC_OK;
650     struct SalvageQueueNode * clone, * parent;
651
652     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
653         code = SYNC_FAILED;
654         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
655         goto done;
656     }
657
658     /* lookup clone's salvage scheduling node */
659     clone = LookupNodeByCommand(com->sop, NULL);
660     if (clone == NULL) {
661         code = SYNC_DENIED;
662         res->hdr->reason = SALVSYNC_REASON_ERROR;
663         goto done;
664     }
665
666     /* lookup parent's salvage scheduling node */
667     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
668     if (parent == NULL) {
669         if (AllocNode(&parent)) {
670             code = SYNC_DENIED;
671             res->hdr->reason = SYNC_REASON_NOMEM;
672             goto done;
673         }
674         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
675         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
676         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
677         AddNodeToHash(parent);
678     }
679
680     if (LinkNode(parent, clone)) {
681         code = SYNC_DENIED;
682         goto done;
683     }
684
685  done:
686     return code;
687 }
688
689 /**
690  * query the status of a volume salvage request.
691  *
692  * @param[in]  com   inbound command object
693  * @param[out] res   outbound response object
694  *
695  * @return operation status
696  *    @retval SYNC_OK success
697  *    @retval SYNC_FAILED malformed command packet
698  *
699  * @note this is a SALVSYNC protocol rpc handler
700  *
701  * @internal
702  */
703 static afs_int32
704 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
705 {
706     afs_int32 code = SYNC_OK;
707     struct SalvageQueueNode * node;
708
709     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
710         code = SYNC_FAILED;
711         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
712         goto done;
713     }
714
715     LookupNodeByCommand(com->sop, &node);
716
717     /* query whether a volume is done salvaging */
718     if (node == NULL) {
719         res->sop->state = SALVSYNC_STATE_UNKNOWN;
720         res->sop->prio = 0;
721     } else {
722         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
723         res->sop->state = node->state;
724         res->sop->prio = node->command.sop.prio;
725     }
726
727  done:
728     return code;
729 }
730
731 static void
732 SALVSYNC_Drop(int fd)
733 {
734     RemoveHandler(fd);
735 #ifdef AFS_NT40_ENV
736     closesocket(fd);
737 #else
738     close(fd);
739 #endif
740     AcceptOn();
741 }
742
743 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
744
745 static void
746 AcceptOn(void)
747 {
748     if (AcceptHandler == -1) {
749         assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
750         AcceptHandler = FindHandler(salvsync_server_state.fd);
751     }
752 }
753
754 static void
755 AcceptOff(void)
756 {
757     if (AcceptHandler != -1) {
758         assert(RemoveHandler(salvsync_server_state.fd));
759         AcceptHandler = -1;
760     }
761 }
762
763 /* The multiple FD handling code. */
764
765 static int HandlerFD[MAXHANDLERS];
766 static void (*HandlerProc[MAXHANDLERS]) (int);
767
768 static void
769 InitHandler(void)
770 {
771     register int i;
772     ObtainWriteLock(&SALVSYNC_handler_lock);
773     for (i = 0; i < MAXHANDLERS; i++) {
774         HandlerFD[i] = -1;
775         HandlerProc[i] = NULL;
776     }
777     ReleaseWriteLock(&SALVSYNC_handler_lock);
778 }
779
780 static void
781 CallHandler(fd_set * fdsetp)
782 {
783     register int i;
784     ObtainReadLock(&SALVSYNC_handler_lock);
785     for (i = 0; i < MAXHANDLERS; i++) {
786         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
787             ReleaseReadLock(&SALVSYNC_handler_lock);
788             (*HandlerProc[i]) (HandlerFD[i]);
789             ObtainReadLock(&SALVSYNC_handler_lock);
790         }
791     }
792     ReleaseReadLock(&SALVSYNC_handler_lock);
793 }
794
795 static int
796 AddHandler(int afd, void (*aproc) (int))
797 {
798     register int i;
799     ObtainWriteLock(&SALVSYNC_handler_lock);
800     for (i = 0; i < MAXHANDLERS; i++)
801         if (HandlerFD[i] == -1)
802             break;
803     if (i >= MAXHANDLERS) {
804         ReleaseWriteLock(&SALVSYNC_handler_lock);
805         return 0;
806     }
807     HandlerFD[i] = afd;
808     HandlerProc[i] = aproc;
809     ReleaseWriteLock(&SALVSYNC_handler_lock);
810     return 1;
811 }
812
813 static int
814 FindHandler(register int afd)
815 {
816     register int i;
817     ObtainReadLock(&SALVSYNC_handler_lock);
818     for (i = 0; i < MAXHANDLERS; i++)
819         if (HandlerFD[i] == afd) {
820             ReleaseReadLock(&SALVSYNC_handler_lock);
821             return i;
822         }
823     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
824     assert(1 == 2);
825     return -1;                  /* satisfy compiler */
826 }
827
828 static int
829 FindHandler_r(register int afd)
830 {
831     register int i;
832     for (i = 0; i < MAXHANDLERS; i++)
833         if (HandlerFD[i] == afd) {
834             return i;
835         }
836     assert(1 == 2);
837     return -1;                  /* satisfy compiler */
838 }
839
840 static int
841 RemoveHandler(register int afd)
842 {
843     ObtainWriteLock(&SALVSYNC_handler_lock);
844     HandlerFD[FindHandler_r(afd)] = -1;
845     ReleaseWriteLock(&SALVSYNC_handler_lock);
846     return 1;
847 }
848
849 static void
850 GetHandler(fd_set * fdsetp, int *maxfdp)
851 {
852     register int i;
853     register int maxfd = -1;
854     FD_ZERO(fdsetp);
855     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
856     for (i = 0; i < MAXHANDLERS; i++)
857         if (HandlerFD[i] != -1) {
858             FD_SET(HandlerFD[i], fdsetp);
859             if (maxfd < HandlerFD[i])
860                 maxfd = HandlerFD[i];
861         }
862     *maxfdp = maxfd;
863     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
864 }
865
866 /**
867  * allocate a salvage queue node.
868  *
869  * @param[out] node_out  address in which to store new node pointer
870  *
871  * @return operation status
872  *    @retval 0 success
873  *    @retval 1 failed to allocate node
874  *
875  * @internal
876  */
877 static int
878 AllocNode(struct SalvageQueueNode ** node_out)
879 {
880     int code = 0;
881     struct SalvageQueueNode * node;
882
883     *node_out = node = (struct SalvageQueueNode *) 
884         malloc(sizeof(struct SalvageQueueNode));
885     if (node == NULL) {
886         code = 1;
887         goto done;
888     }
889
890     memset(node, 0, sizeof(struct SalvageQueueNode));
891     node->type = SALVSYNC_VOLGROUP_PARENT;
892     node->state = SALVSYNC_STATE_UNKNOWN;
893
894  done:
895     return code;
896 }
897
898 /**
899  * link a salvage queue node to its parent.
900  *
901  * @param[in] parent  pointer to queue node for parent of volume group
902  * @param[in] clone   pointer to queue node for a clone
903  *
904  * @return operation status
905  *    @retval 0 success
906  *    @retval 1 failure
907  *
908  * @internal
909  */
910 static int
911 LinkNode(struct SalvageQueueNode * parent,
912          struct SalvageQueueNode * clone)
913 {
914     int code = 0;
915     int idx;
916
917     /* check for attaching a clone to a clone */
918     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
919         code = 1;
920         goto done;
921     }
922
923     /* check for pre-existing registration and openings */
924     for (idx = 0; idx < VOLMAXTYPES; idx++) {
925         if (parent->volgroup.children[idx] == clone) {
926             goto linked;
927         }
928         if (parent->volgroup.children[idx] == NULL) {
929             break;
930         }
931     }
932     if (idx == VOLMAXTYPES) {
933         code = 1;
934         goto done;
935     }
936
937     /* link parent and child */
938     parent->volgroup.children[idx] = clone;
939     clone->type = SALVSYNC_VOLGROUP_CLONE;
940     clone->volgroup.parent = parent;
941
942
943  linked:
944     switch (clone->state) {
945     case SALVSYNC_STATE_QUEUED:
946         DeleteFromSalvageQueue(clone);
947
948     case SALVSYNC_STATE_SALVAGING:
949         switch (parent->state) {
950         case SALVSYNC_STATE_UNKNOWN:
951         case SALVSYNC_STATE_ERROR:
952         case SALVSYNC_STATE_DONE:
953             parent->command.sop.prio = clone->command.sop.prio;
954             AddToSalvageQueue(parent);
955             break;
956
957         case SALVSYNC_STATE_QUEUED:
958             if (clone->command.sop.prio) {
959                 parent->command.sop.prio += clone->command.sop.prio;
960                 UpdateCommandPrio(parent);
961             }
962             break;
963
964         default:
965             break;
966         }
967         break;
968
969     default:
970         break;
971     }
972
973  done:
974     return code;
975 }
976
977 static void
978 HandlePrio(struct SalvageQueueNode * clone, 
979            struct SalvageQueueNode * node,
980            afs_uint32 new_prio)
981 {
982     afs_uint32 delta;
983
984     switch (node->state) {
985     case SALVSYNC_STATE_ERROR:
986     case SALVSYNC_STATE_DONE:
987     case SALVSYNC_STATE_UNKNOWN:
988         node->command.sop.prio = 0;
989         break;
990     }
991
992     if (new_prio < clone->command.sop.prio) {
993         /* strange. let's just set our delta to 1 */
994         delta = 1;
995     } else {
996         delta = new_prio - clone->command.sop.prio;
997     }
998
999     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1000         clone->command.sop.prio = new_prio;
1001     }
1002
1003     node->command.sop.prio += delta;
1004 }
1005
1006 static int
1007 AddToSalvageQueue(struct SalvageQueueNode * node)
1008 {
1009     afs_int32 id;
1010     struct SalvageQueueNode * last = NULL;
1011
1012     id = volutil_GetPartitionID(node->command.sop.partName);
1013     if (id < 0 || id > VOLMAXPARTS) {
1014         return 1;
1015     }
1016     if (!VGetPartitionById_r(id, 0)) {
1017         /* don't enqueue salvage requests for unmounted partitions */
1018         return 1;
1019     }
1020     if (queue_IsOnQueue(node)) {
1021         return 0;
1022     }
1023
1024     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1025         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1026     }
1027     queue_Append(&salvageQueue.part[id], node);
1028     salvageQueue.len[id]++;
1029     salvageQueue.total_len++;
1030     salvageQueue.last_insert = id;
1031     node->partition_id = id;
1032     node->state = SALVSYNC_STATE_QUEUED;
1033
1034     /* reorder, if necessary */
1035     if (last && last->command.sop.prio < node->command.sop.prio) {
1036         UpdateCommandPrio(node);
1037     }
1038
1039     assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1040     return 0;
1041 }
1042
1043 static void
1044 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1045 {
1046     if (queue_IsOnQueue(node)) {
1047         queue_Remove(node);
1048         salvageQueue.len[node->partition_id]--;
1049         salvageQueue.total_len--;
1050         node->state = SALVSYNC_STATE_UNKNOWN;
1051         assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1052     }
1053 }
1054
1055 static void
1056 AddToPendingQueue(struct SalvageQueueNode * node)
1057 {
1058     queue_Append(&pendingQueue, node);
1059     pendingQueue.len++;
1060     node->state = SALVSYNC_STATE_SALVAGING;
1061     assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1062 }
1063
1064 static void
1065 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1066 {
1067     if (queue_IsOnQueue(node)) {
1068         queue_Remove(node);
1069         pendingQueue.len--;
1070         node->state = SALVSYNC_STATE_UNKNOWN;
1071         assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1072     }
1073 }
1074
1075 static struct SalvageQueueNode *
1076 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1077 {
1078     struct SalvageQueueNode * np, * nnp;
1079
1080     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1081         if ((np->command.sop.volume == qry->volume) && 
1082             !strncmp(np->command.sop.partName, qry->partName,
1083                      sizeof(qry->partName)))
1084             break;
1085     }
1086
1087     if (queue_IsEnd(&pendingQueue, np))
1088         np = NULL;
1089     return np;
1090 }
1091
1092 static struct SalvageQueueNode *
1093 LookupPendingCommandByPid(int pid)
1094 {
1095     struct SalvageQueueNode * np, * nnp;
1096
1097     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1098         if (np->pid == pid)
1099             break;
1100     }
1101
1102     if (queue_IsEnd(&pendingQueue, np))
1103         np = NULL;
1104     return np;
1105 }
1106
1107
1108 /* raise the priority of a previously scheduled salvage */
1109 static void
1110 UpdateCommandPrio(struct SalvageQueueNode * node)
1111 {
1112     struct SalvageQueueNode *np, *nnp;
1113     afs_int32 id;
1114     afs_uint32 prio;
1115
1116     assert(queue_IsOnQueue(node));
1117
1118     prio = node->command.sop.prio;
1119     id = node->partition_id;
1120     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1121         queue_Remove(node);
1122         queue_Prepend(&salvageQueue.part[id], node);
1123     } else {
1124         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1125             if (np->command.sop.prio > prio)
1126                 break;
1127         }
1128         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1129             queue_Remove(node);
1130             queue_Prepend(&salvageQueue.part[id], node);
1131         } else if (node != np) {
1132             queue_Remove(node);
1133             queue_InsertAfter(np, node);
1134         }
1135     }
1136 }
1137
1138 /* this will need to be rearchitected if we ever want more than one thread
1139  * to wait for new salvage nodes */
1140 struct SalvageQueueNode * 
1141 SALVSYNC_getWork(void)
1142 {
1143     int i, ret;
1144     struct DiskPartition64 * dp = NULL, * fdp;
1145     static afs_int32 next_part_sched = 0;
1146     struct SalvageQueueNode *node = NULL, *np;
1147
1148     VOL_LOCK;
1149
1150     /*
1151      * wait for work to be scheduled
1152      * if there are no disk partitions, just sit in this wait loop forever
1153      */
1154     while (!salvageQueue.total_len || !DiskPartitionList) {
1155         VOL_CV_WAIT(&salvageQueue.cv);
1156     }
1157
1158     /* 
1159      * short circuit for simple case where only one partition has
1160      * scheduled salvages
1161      */
1162     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1163         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1164         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1165         goto have_node;
1166     }
1167
1168
1169     /* 
1170      * ok, more than one partition has scheduled salvages.
1171      * now search for partitions with scheduled salvages, but no pending salvages. 
1172      */
1173     dp = VGetPartitionById_r(next_part_sched, 0);
1174     if (!dp) {
1175         dp = DiskPartitionList;
1176     }
1177     fdp = dp;
1178
1179     for (i=0 ; 
1180          !i || dp != fdp ; 
1181          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1182         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1183             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1184             goto have_node;
1185         }
1186     }
1187
1188
1189     /*
1190      * all partitions with scheduled salvages have at least one pending.
1191      * now do an exhaustive search for a scheduled salvage.
1192      */
1193     dp = fdp;
1194
1195     for (i=0 ; 
1196          !i || dp != fdp ; 
1197          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1198         if (salvageQueue.len[dp->index]) {
1199             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1200             goto have_node;
1201         }
1202     }
1203
1204     /* we should never reach this line */
1205     assert(1==2);
1206
1207  have_node:
1208     assert(node != NULL);
1209     node->pid = 0;
1210     partition_salvaging[node->partition_id]++;
1211     DeleteFromSalvageQueue(node);
1212     AddToPendingQueue(node);
1213
1214     if (dp) {
1215         /* update next_part_sched field */
1216         if (dp->next) {
1217             next_part_sched = dp->next->index;
1218         } else if (DiskPartitionList) {
1219             next_part_sched = DiskPartitionList->index;
1220         } else {
1221             next_part_sched = -1;
1222         }
1223     }
1224
1225  bail:
1226     VOL_UNLOCK;
1227     return node;
1228 }
1229
1230 /**
1231  * update internal scheduler state to reflect completion of a work unit.
1232  *
1233  * @param[in]  node    salvage queue node object pointer
1234  * @param[in]  result  worker process result code
1235  *
1236  * @post scheduler state is updated.
1237  *
1238  * @internal
1239  */
1240 static void
1241 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1242 {
1243     afs_int32 partid;
1244     int idx;
1245
1246     DeleteFromPendingQueue(node);
1247     partid = node->partition_id;
1248     if (partid >=0 && partid <= VOLMAXPARTS) {
1249         partition_salvaging[partid]--;
1250     }
1251     if (result == 0) {
1252         node->state = SALVSYNC_STATE_DONE;
1253     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1254         node->state = SALVSYNC_STATE_ERROR;
1255     }
1256
1257     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1258         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1259             if (node->volgroup.children[idx]) {
1260                 node->volgroup.children[idx]->state = node->state;
1261             }
1262         }
1263     }
1264 }
1265
1266 /**
1267  * check whether worker child failed.
1268  *
1269  * @param[in] status  status bitfield return by wait()
1270  *
1271  * @return boolean failure code
1272  *    @retval 0 child succeeded
1273  *    @retval 1 child failed
1274  *
1275  * @internal
1276  */
1277 static int
1278 ChildFailed(int status)
1279 {
1280     return (WCOREDUMP(status) || 
1281             WIFSIGNALED(status) || 
1282             ((WEXITSTATUS(status) != 0) && 
1283              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1284 }
1285
1286
1287 /**
1288  * notify salvsync scheduler of node completion, by child pid.
1289  *
1290  * @param[in]  pid     pid of worker child
1291  * @param[in]  status  worker status bitfield from wait()
1292  *
1293  * @post scheduler state is updated.
1294  *       if status code is a failure, fileserver notification was attempted
1295  *
1296  * @see SALVSYNC_doneWork_r
1297  */
1298 void
1299 SALVSYNC_doneWorkByPid(int pid, int status)
1300 {
1301     struct SalvageQueueNode * node;
1302     char partName[16];
1303     afs_uint32 volids[VOLMAXTYPES+1];
1304     unsigned int idx;
1305
1306     memset(volids, 0, sizeof(volids));
1307
1308     VOL_LOCK;
1309     node = LookupPendingCommandByPid(pid);
1310     if (node != NULL) {
1311         SALVSYNC_doneWork_r(node, status);
1312
1313         if (ChildFailed(status)) {
1314             /* populate volume id list for later processing outside the glock */
1315             volids[0] = node->command.sop.volume;
1316             strcpy(partName, node->command.sop.partName);
1317             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1318                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1319                     if (node->volgroup.children[idx]) {
1320                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1321                     }
1322                 }
1323             }
1324         }
1325     }
1326     VOL_UNLOCK;
1327
1328     /*
1329      * if necessary, notify fileserver of
1330      * failure to salvage volume group
1331      * [we cannot guarantee that the child made the
1332      *  appropriate notifications (e.g. SIGSEGV)]
1333      *  -- tkeiser 11/28/2007
1334      */
1335     if (ChildFailed(status)) {
1336         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1337             if (volids[idx]) {
1338                 FSYNC_VolOp(volids[idx],
1339                             partName,
1340                             FSYNC_VOL_FORCE_ERROR,
1341                             FSYNC_WHATEVER,
1342                             NULL);
1343             }
1344         }
1345     }
1346 }
1347
1348 #endif /* AFS_DEMAND_ATTACH_FS */