dafs-updates-20080612
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29 RCSID
30     ("$Header$");
31
32 #include <sys/types.h>
33 #include <stdio.h>
34 #ifdef AFS_NT40_ENV
35 #include <winsock2.h>
36 #include <time.h>
37 #else
38 #include <sys/param.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <netdb.h>
42 #include <sys/time.h>
43 #endif
44 #include <errno.h>
45 #include <assert.h>
46 #include <signal.h>
47 #include <string.h>
48
49
50 #include <rx/xdr.h>
51 #include <afs/afsint.h>
52 #include "nfs.h"
53 #include <afs/errors.h>
54 #include "salvsync.h"
55 #include "lwp.h"
56 #include "lock.h"
57 #include <afs/afssyscalls.h>
58 #include "ihandle.h"
59 #include "vnode.h"
60 #include "volume.h"
61 #include "partition.h"
62 #include <rx/rx_queue.h>
63 #include <afs/procmgmt.h>
64
65 #if !defined(offsetof)
66 #include <stddef.h>
67 #endif
68
69 #ifdef USE_UNIX_SOCKETS
70 #include <afs/afsutil.h>
71 #include <sys/un.h>
72 #endif
73
74
75 /*@printflike@*/ extern void Log(const char *format, ...);
76
77 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
78                                  * move = dump+restore can run on single server */
79
80 /* Forward declarations */
81 static void * SALVSYNC_syncThread(void *);
82 static void SALVSYNC_newconnection(int fd);
83 static void SALVSYNC_com(int fd);
84 static void SALVSYNC_Drop(int fd);
85 static void AcceptOn(void);
86 static void AcceptOff(void);
87 static void InitHandler(void);
88 static void CallHandler(fd_set * fdsetp);
89 static int AddHandler(int afd, void (*aproc) (int));
90 static int FindHandler(register int afd);
91 static int FindHandler_r(register int afd);
92 static int RemoveHandler(register int afd);
93 static void GetHandler(fd_set * fdsetp, int *maxfdp);
94
95
96 /*
97  * This lock controls access to the handler array.
98  */
99 struct Lock SALVSYNC_handler_lock;
100
101
102 #ifdef AFS_DEMAND_ATTACH_FS
103 /*
104  * SALVSYNC is a feature specific to the demand attach fileserver
105  */
106
107 static int AllocNode(struct SalvageQueueNode ** node);
108
109 static int AddToSalvageQueue(struct SalvageQueueNode * node);
110 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
111 static void AddToPendingQueue(struct SalvageQueueNode * node);
112 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
113 static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
114 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
115 static void UpdateCommandPrio(struct SalvageQueueNode * node);
116 static void HandlePrio(struct SalvageQueueNode * clone, 
117                        struct SalvageQueueNode * parent,
118                        afs_uint32 new_prio);
119
120 static int LinkNode(struct SalvageQueueNode * parent,
121                     struct SalvageQueueNode * clone);
122
123 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName, 
124                                             struct SalvageQueueNode ** parent);
125 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
126                                                      struct SalvageQueueNode ** parent);
127 static void AddNodeToHash(struct SalvageQueueNode * node);
128 static void DeleteNodeFromHash(struct SalvageQueueNode * node);
129
130 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
131 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
132 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
133 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
134 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
135
136
137 extern int LogLevel;
138 extern int VInit;
139 extern pthread_mutex_t vol_salvsync_mutex;
140
141 /**
142  * salvsync server socket handle.
143  */
144 static SYNC_server_state_t salvsync_server_state = 
145     { -1,                       /* file descriptor */
146       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
147       SALVSYNC_PROTO_VERSION,   /* protocol version */
148       5,                        /* bind() retry limit */
149       100,                      /* listen() queue depth */
150       "SALVSYNC",               /* protocol name string */
151     };
152
153
154 /**
155  * queue of all volumes waiting to be salvaged.
156  */
157 struct SalvageQueue {
158     volatile int total_len;
159     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
160     volatile int len[VOLMAXPARTS+1];
161     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
162     pthread_cond_t cv;
163 };
164 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
165
166 /**
167  * queue of all volumes currently being salvaged.
168  */
169 struct QueueHead {
170     volatile struct rx_queue q;  /**< queue of salvages in progress */
171     volatile int len;            /**< length of in-progress queue */
172     pthread_cond_t queue_change_cv;
173 };
174 static struct QueueHead pendingQueue;  /* volumes being salvaged */
175
176 /* XXX
177  * whether a partition has a salvage in progress
178  *
179  * the salvager code only permits one salvage per partition at a time
180  *
181  * the following hack tries to keep salvaged parallelism high by
182  * only permitting one salvage dispatch per partition at a time
183  *
184  * unfortunately, the parallel salvager currently
185  * has a rather braindead routine that won't permit
186  * multiple salvages on the same "device".  this
187  * function happens to break pretty badly on lvm, raid luns, etc.
188  *
189  * this hack isn't good enough to stop the device limiting code from
190  * crippling performance.  someday that code needs to be rewritten
191  */
192 static int partition_salvaging[VOLMAXPARTS+1];
193
194 #define VSHASH_SIZE 64
195 #define VSHASH_MASK (VSHASH_SIZE-1)
196 #define VSHASH(vid) ((vid)&VSHASH_MASK)
197
198 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
199
200 static struct SalvageQueueNode *
201 LookupNode(afs_uint32 vid, char * partName,
202            struct SalvageQueueNode ** parent)
203 {
204     struct rx_queue *qp, *nqp;
205     struct SalvageQueueNode *vsp;
206     int idx = VSHASH(vid);
207
208     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
209         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
210         if ((vsp->command.sop.volume == vid) &&
211             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
212             break;
213         }
214     }
215
216     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
217         vsp = NULL;
218     }
219
220     if (parent) {
221         if (vsp) {
222             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
223                 vsp->volgroup.parent : vsp;
224         } else {
225             *parent = NULL;
226         }
227     }
228
229     return vsp;
230 }
231
232 static struct SalvageQueueNode *
233 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
234                     struct SalvageQueueNode ** parent)
235 {
236     return LookupNode(qry->volume, qry->partName, parent);
237 }
238
239 static void
240 AddNodeToHash(struct SalvageQueueNode * node)
241 {
242     int idx = VSHASH(node->command.sop.volume);
243
244     if (queue_IsOnQueue(&node->hash_chain)) {
245         return;
246     }
247
248     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
249     SalvageHashTable[idx].len++;
250 }
251
252 static void
253 DeleteNodeFromHash(struct SalvageQueueNode * node)
254 {
255     int idx = VSHASH(node->command.sop.volume);
256
257     if (queue_IsNotOnQueue(&node->hash_chain)) {
258         return;
259     }
260
261     queue_Remove(&node->hash_chain);
262     SalvageHashTable[idx].len--;
263 }
264
265 void
266 SALVSYNC_salvInit(void)
267 {
268     int i;
269     pthread_t tid;
270     pthread_attr_t tattr;
271
272     /* initialize the queues */
273     Lock_Init(&SALVSYNC_handler_lock);
274     assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
275     for (i = 0; i <= VOLMAXPARTS; i++) {
276         queue_Init(&salvageQueue.part[i]);
277         salvageQueue.len[i] = 0;
278     }
279     assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
280     queue_Init(&pendingQueue);
281     salvageQueue.total_len = pendingQueue.len = 0;
282     salvageQueue.last_insert = -1;
283     memset(partition_salvaging, 0, sizeof(partition_salvaging));
284
285     for (i = 0; i < VSHASH_SIZE; i++) {
286         assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
287         SalvageHashTable[i].len = 0;
288         queue_Init(&SalvageHashTable[i]);
289     }
290
291     /* start the salvsync thread */
292     assert(pthread_attr_init(&tattr) == 0);
293     assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
294     assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
295 }
296
297
298 static fd_set SALVSYNC_readfds;
299
300 static void *
301 SALVSYNC_syncThread(void * args)
302 {
303     int on = 1;
304     int code;
305     int numTries;
306     int tid;
307     SYNC_server_state_t * state = &salvsync_server_state;
308
309     SYNC_getAddr(&state->endpoint, &state->addr);
310     SYNC_cleanupSock(state);
311
312 #ifndef AFS_NT40_ENV
313     (void)signal(SIGPIPE, SIG_IGN);
314 #endif
315
316     state->fd = SYNC_getSock(&state->endpoint);
317     code = SYNC_bindSock(state);
318     assert(!code);
319
320     InitHandler();
321     AcceptOn();
322
323     for (;;) {
324         int maxfd;
325         GetHandler(&SALVSYNC_readfds, &maxfd);
326         /* Note: check for >= 1 below is essential since IOMGR_select
327          * doesn't have exactly same semantics as select.
328          */
329         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
330             CallHandler(&SALVSYNC_readfds);
331     }
332
333     return NULL;
334 }
335
336 static void
337 SALVSYNC_newconnection(int afd)
338 {
339 #ifdef USE_UNIX_SOCKETS
340     struct sockaddr_un other;
341 #else  /* USE_UNIX_SOCKETS */
342     struct sockaddr_in other;
343 #endif
344     int junk, fd;
345     junk = sizeof(other);
346     fd = accept(afd, (struct sockaddr *)&other, &junk);
347     if (fd == -1) {
348         Log("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
349         assert(1 == 2);
350     } else if (!AddHandler(fd, SALVSYNC_com)) {
351         AcceptOff();
352         assert(AddHandler(fd, SALVSYNC_com));
353     }
354 }
355
356 /* this function processes commands from an salvsync file descriptor (fd) */
357 static afs_int32 SALV_cnt = 0;
358 static void
359 SALVSYNC_com(int fd)
360 {
361     SYNC_command com;
362     SYNC_response res;
363     SALVSYNC_response_hdr sres_hdr;
364     SALVSYNC_command scom;
365     SALVSYNC_response sres;
366     SYNC_PROTO_BUF_DECL(buf);
367     
368     com.payload.buf = (void *)buf;
369     com.payload.len = SYNC_PROTO_MAX_LEN;
370     res.payload.buf = (void *) &sres_hdr;
371     res.payload.len = sizeof(sres_hdr);
372     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
373     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
374
375     scom.hdr = &com.hdr;
376     scom.sop = (SALVSYNC_command_hdr *) buf;
377     scom.com = &com;
378     sres.hdr = &res.hdr;
379     sres.sop = &sres_hdr;
380     sres.res = &res;
381
382     SALV_cnt++;
383     if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
384         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
385         SALVSYNC_Drop(fd);
386         return;
387     }
388
389     if (com.recv_len < sizeof(com.hdr)) {
390         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
391         res.hdr.response = SYNC_COM_ERROR;
392         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
393         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
394         goto respond;
395     }
396
397     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
398         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
399         res.hdr.response = SYNC_COM_ERROR;
400         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
401         goto respond;
402     }
403
404     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
405         res.hdr.response = SYNC_OK;
406         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
407         goto respond;
408     }
409
410     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
411         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
412         res.hdr.response = SYNC_COM_ERROR;
413         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
414         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
415         goto respond;
416     }
417
418     res.hdr.com_seq = com.hdr.com_seq;
419
420     VOL_LOCK;
421     switch (com.hdr.command) {
422     case SALVSYNC_NOP:
423         break;
424     case SALVSYNC_SALVAGE:
425     case SALVSYNC_RAISEPRIO:
426         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
427         break;
428     case SALVSYNC_CANCEL:
429         /* cancel a salvage */
430         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
431         break;
432     case SALVSYNC_CANCELALL:
433         /* cancel all queued salvages */
434         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
435         break;
436     case SALVSYNC_QUERY:
437         /* query whether a volume is done salvaging */
438         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
439         break;
440     case SALVSYNC_OP_LINK:
441         /* link a clone to its parent in the scheduler */
442         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
443         break;
444     default:
445         res.hdr.response = SYNC_BAD_COMMAND;
446         break;
447     }
448
449     sres_hdr.sq_len = salvageQueue.total_len;
450     sres_hdr.pq_len = pendingQueue.len;
451     VOL_UNLOCK;
452
453  respond:
454     SYNC_putRes(&salvsync_server_state, fd, &res);
455     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
456         SALVSYNC_Drop(fd);
457     }
458 }
459
460 /**
461  * request that a volume be salvaged.
462  *
463  * @param[in]  com  inbound command object
464  * @param[out] res  outbound response object
465  *
466  * @return operation status
467  *    @retval SYNC_OK success
468  *    @retval SYNC_DENIED failed to enqueue request
469  *    @retval SYNC_FAILED malformed command packet
470  *
471  * @note this is a SALVSYNC protocol rpc handler
472  *
473  * @internal
474  *
475  * @post the volume is enqueued in the to-be-salvaged queue.  
476  *       if the volume was already in the salvage queue, its 
477  *       priority (and thus its location in the queue) are 
478  *       updated.
479  */
480 static afs_int32
481 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
482 {
483     afs_int32 code = SYNC_OK;
484     struct SalvageQueueNode * node, * clone;
485     int hash = 0;
486
487     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
488         code = SYNC_FAILED;
489         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
490         goto done;
491     }
492
493     clone = LookupNodeByCommand(com->sop, &node);
494
495     if (node == NULL) {
496         if (AllocNode(&node)) {
497             code = SYNC_DENIED;
498             res->hdr->reason = SYNC_REASON_NOMEM;
499             goto done;
500         }
501         clone = node;
502         hash = 1;
503     }
504
505     HandlePrio(clone, node, com->sop->prio);
506
507     switch (node->state) {
508     case SALVSYNC_STATE_QUEUED:
509         UpdateCommandPrio(node);
510         break;
511
512     case SALVSYNC_STATE_ERROR:
513     case SALVSYNC_STATE_DONE:
514     case SALVSYNC_STATE_UNKNOWN:
515         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
516         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
517
518         /* 
519          * make sure volgroup parent partition path is kept coherent
520          *
521          * If we ever want to support non-COW clones on a machine holding
522          * the RW site, please note that this code does not work under the
523          * conditions where someone zaps a COW clone on partition X, and
524          * subsequently creates a full clone on partition Y -- we'd need
525          * an inverse to SALVSYNC_com_Link.
526          *  -- tkeiser 11/28/2007
527          */
528         strcpy(node->command.sop.partName, com->sop->partName);
529
530         if (AddToSalvageQueue(node)) {
531             code = SYNC_DENIED;
532         }
533         break;
534
535     default:
536         break;
537     }
538
539     if (hash) {
540         AddNodeToHash(node);
541     }
542
543     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
544     res->sop->state = node->state;
545     res->sop->prio = node->command.sop.prio;
546
547  done:
548     return code;
549 }
550
551 /**
552  * cancel a pending salvage request.
553  *
554  * @param[in]  com  inbound command object
555  * @param[out] res  outbound response object
556  *
557  * @return operation status
558  *    @retval SYNC_OK success
559  *    @retval SYNC_FAILED malformed command packet
560  *
561  * @note this is a SALVSYNC protocol rpc handler
562  *
563  * @internal
564  */
565 static afs_int32
566 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
567 {
568     afs_int32 code = SYNC_OK;
569     struct SalvageQueueNode * node;
570
571     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
572         code = SYNC_FAILED;
573         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
574         goto done;
575     }
576
577     node = LookupNodeByCommand(com->sop, NULL);
578
579     if (node == NULL) {
580         res->sop->state = SALVSYNC_STATE_UNKNOWN;
581         res->sop->prio = 0;
582     } else {
583         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
584         res->sop->prio = node->command.sop.prio;
585         res->sop->state = node->state;
586         if ((node->type == SALVSYNC_VOLGROUP_PARENT) && 
587             (node->state == SALVSYNC_STATE_QUEUED)) {
588             DeleteFromSalvageQueue(node);
589         }
590     }
591
592  done:
593     return code;
594 }
595
596 /**
597  * cancel all pending salvage requests.
598  *
599  * @param[in]  com  incoming command object
600  * @param[out] res  outbound response object
601  *
602  * @return operation status
603  *    @retval SYNC_OK success
604  *
605  * @note this is a SALVSYNC protocol rpc handler
606  *
607  * @internal
608  */
609 static afs_int32
610 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
611 {
612     struct SalvageQueueNode * np, *nnp;
613     struct DiskPartition64 * dp;
614
615     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
616         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
617             DeleteFromSalvageQueue(np);
618         }
619     }
620
621     return SYNC_OK;
622 }
623
624 /**
625  * link a queue node for a clone to its parent volume.
626  *
627  * @param[in]  com   inbound command object
628  * @param[out] res   outbound response object
629  *
630  * @return operation status
631  *    @retval SYNC_OK success
632  *    @retval SYNC_FAILED malformed command packet
633  *    @retval SYNC_DENIED the request could not be completed
634  *
635  * @note this is a SALVSYNC protocol rpc handler
636  *
637  * @post the requested volume is marked as a child of another volume.
638  *       thus, future salvage requests for this volume will result in the
639  *       parent of the volume group being scheduled for salvage instead
640  *       of this clone.
641  *
642  * @internal
643  */
644 static afs_int32
645 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
646 {
647     afs_int32 code = SYNC_OK;
648     struct SalvageQueueNode * clone, * parent;
649
650     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
651         code = SYNC_FAILED;
652         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
653         goto done;
654     }
655
656     /* lookup clone's salvage scheduling node */
657     clone = LookupNodeByCommand(com->sop, NULL);
658     if (clone == NULL) {
659         code = SYNC_DENIED;
660         res->hdr->reason = SALVSYNC_REASON_ERROR;
661         goto done;
662     }
663
664     /* lookup parent's salvage scheduling node */
665     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
666     if (parent == NULL) {
667         if (AllocNode(&parent)) {
668             code = SYNC_DENIED;
669             res->hdr->reason = SYNC_REASON_NOMEM;
670             goto done;
671         }
672         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
673         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
674         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
675         AddNodeToHash(parent);
676     }
677
678     if (LinkNode(parent, clone)) {
679         code = SYNC_DENIED;
680         goto done;
681     }
682
683  done:
684     return code;
685 }
686
687 /**
688  * query the status of a volume salvage request.
689  *
690  * @param[in]  com   inbound command object
691  * @param[out] res   outbound response object
692  *
693  * @return operation status
694  *    @retval SYNC_OK success
695  *    @retval SYNC_FAILED malformed command packet
696  *
697  * @note this is a SALVSYNC protocol rpc handler
698  *
699  * @internal
700  */
701 static afs_int32
702 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
703 {
704     afs_int32 code = SYNC_OK;
705     struct SalvageQueueNode * node;
706
707     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
708         code = SYNC_FAILED;
709         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
710         goto done;
711     }
712
713     LookupNodeByCommand(com->sop, &node);
714
715     /* query whether a volume is done salvaging */
716     if (node == NULL) {
717         res->sop->state = SALVSYNC_STATE_UNKNOWN;
718         res->sop->prio = 0;
719     } else {
720         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
721         res->sop->state = node->state;
722         res->sop->prio = node->command.sop.prio;
723     }
724
725  done:
726     return code;
727 }
728
729 static void
730 SALVSYNC_Drop(int fd)
731 {
732     RemoveHandler(fd);
733 #ifdef AFS_NT40_ENV
734     closesocket(fd);
735 #else
736     close(fd);
737 #endif
738     AcceptOn();
739 }
740
741 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
742
743 static void
744 AcceptOn(void)
745 {
746     if (AcceptHandler == -1) {
747         assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
748         AcceptHandler = FindHandler(salvsync_server_state.fd);
749     }
750 }
751
752 static void
753 AcceptOff(void)
754 {
755     if (AcceptHandler != -1) {
756         assert(RemoveHandler(salvsync_server_state.fd));
757         AcceptHandler = -1;
758     }
759 }
760
761 /* The multiple FD handling code. */
762
763 static int HandlerFD[MAXHANDLERS];
764 static void (*HandlerProc[MAXHANDLERS]) (int);
765
766 static void
767 InitHandler(void)
768 {
769     register int i;
770     ObtainWriteLock(&SALVSYNC_handler_lock);
771     for (i = 0; i < MAXHANDLERS; i++) {
772         HandlerFD[i] = -1;
773         HandlerProc[i] = NULL;
774     }
775     ReleaseWriteLock(&SALVSYNC_handler_lock);
776 }
777
778 static void
779 CallHandler(fd_set * fdsetp)
780 {
781     register int i;
782     ObtainReadLock(&SALVSYNC_handler_lock);
783     for (i = 0; i < MAXHANDLERS; i++) {
784         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
785             ReleaseReadLock(&SALVSYNC_handler_lock);
786             (*HandlerProc[i]) (HandlerFD[i]);
787             ObtainReadLock(&SALVSYNC_handler_lock);
788         }
789     }
790     ReleaseReadLock(&SALVSYNC_handler_lock);
791 }
792
793 static int
794 AddHandler(int afd, void (*aproc) (int))
795 {
796     register int i;
797     ObtainWriteLock(&SALVSYNC_handler_lock);
798     for (i = 0; i < MAXHANDLERS; i++)
799         if (HandlerFD[i] == -1)
800             break;
801     if (i >= MAXHANDLERS) {
802         ReleaseWriteLock(&SALVSYNC_handler_lock);
803         return 0;
804     }
805     HandlerFD[i] = afd;
806     HandlerProc[i] = aproc;
807     ReleaseWriteLock(&SALVSYNC_handler_lock);
808     return 1;
809 }
810
811 static int
812 FindHandler(register int afd)
813 {
814     register int i;
815     ObtainReadLock(&SALVSYNC_handler_lock);
816     for (i = 0; i < MAXHANDLERS; i++)
817         if (HandlerFD[i] == afd) {
818             ReleaseReadLock(&SALVSYNC_handler_lock);
819             return i;
820         }
821     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
822     assert(1 == 2);
823     return -1;                  /* satisfy compiler */
824 }
825
826 static int
827 FindHandler_r(register int afd)
828 {
829     register int i;
830     for (i = 0; i < MAXHANDLERS; i++)
831         if (HandlerFD[i] == afd) {
832             return i;
833         }
834     assert(1 == 2);
835     return -1;                  /* satisfy compiler */
836 }
837
838 static int
839 RemoveHandler(register int afd)
840 {
841     ObtainWriteLock(&SALVSYNC_handler_lock);
842     HandlerFD[FindHandler_r(afd)] = -1;
843     ReleaseWriteLock(&SALVSYNC_handler_lock);
844     return 1;
845 }
846
847 static void
848 GetHandler(fd_set * fdsetp, int *maxfdp)
849 {
850     register int i;
851     register int maxfd = -1;
852     FD_ZERO(fdsetp);
853     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
854     for (i = 0; i < MAXHANDLERS; i++)
855         if (HandlerFD[i] != -1) {
856             FD_SET(HandlerFD[i], fdsetp);
857             if (maxfd < HandlerFD[i])
858                 maxfd = HandlerFD[i];
859         }
860     *maxfdp = maxfd;
861     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
862 }
863
864 /**
865  * allocate a salvage queue node.
866  *
867  * @param[out] node_out  address in which to store new node pointer
868  *
869  * @return operation status
870  *    @retval 0 success
871  *    @retval 1 failed to allocate node
872  *
873  * @internal
874  */
875 static int
876 AllocNode(struct SalvageQueueNode ** node_out)
877 {
878     int code = 0;
879     struct SalvageQueueNode * node;
880
881     *node_out = node = (struct SalvageQueueNode *) 
882         malloc(sizeof(struct SalvageQueueNode));
883     if (node == NULL) {
884         code = 1;
885         goto done;
886     }
887
888     memset(node, 0, sizeof(struct SalvageQueueNode));
889     node->type = SALVSYNC_VOLGROUP_PARENT;
890     node->state = SALVSYNC_STATE_UNKNOWN;
891
892  done:
893     return code;
894 }
895
896 /**
897  * link a salvage queue node to its parent.
898  *
899  * @param[in] parent  pointer to queue node for parent of volume group
900  * @param[in] clone   pointer to queue node for a clone
901  *
902  * @return operation status
903  *    @retval 0 success
904  *    @retval 1 failure
905  *
906  * @internal
907  */
908 static int
909 LinkNode(struct SalvageQueueNode * parent,
910          struct SalvageQueueNode * clone)
911 {
912     int code = 0;
913     int idx;
914
915     /* check for attaching a clone to a clone */
916     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
917         code = 1;
918         goto done;
919     }
920
921     /* check for pre-existing registration and openings */
922     for (idx = 0; idx < VOLMAXTYPES; idx++) {
923         if (parent->volgroup.children[idx] == clone) {
924             goto linked;
925         }
926         if (parent->volgroup.children[idx] == NULL) {
927             break;
928         }
929     }
930     if (idx == VOLMAXTYPES) {
931         code = 1;
932         goto done;
933     }
934
935     /* link parent and child */
936     parent->volgroup.children[idx] = clone;
937     clone->type = SALVSYNC_VOLGROUP_CLONE;
938     clone->volgroup.parent = parent;
939
940
941  linked:
942     switch (clone->state) {
943     case SALVSYNC_STATE_QUEUED:
944         DeleteFromSalvageQueue(clone);
945
946     case SALVSYNC_STATE_SALVAGING:
947         switch (parent->state) {
948         case SALVSYNC_STATE_UNKNOWN:
949         case SALVSYNC_STATE_ERROR:
950         case SALVSYNC_STATE_DONE:
951             parent->command.sop.prio = clone->command.sop.prio;
952             AddToSalvageQueue(parent);
953             break;
954
955         case SALVSYNC_STATE_QUEUED:
956             if (clone->command.sop.prio) {
957                 parent->command.sop.prio += clone->command.sop.prio;
958                 UpdateCommandPrio(parent);
959             }
960             break;
961
962         default:
963             break;
964         }
965         break;
966
967     default:
968         break;
969     }
970
971  done:
972     return code;
973 }
974
975 static void
976 HandlePrio(struct SalvageQueueNode * clone, 
977            struct SalvageQueueNode * node,
978            afs_uint32 new_prio)
979 {
980     afs_uint32 delta;
981
982     switch (node->state) {
983     case SALVSYNC_STATE_ERROR:
984     case SALVSYNC_STATE_DONE:
985     case SALVSYNC_STATE_UNKNOWN:
986         node->command.sop.prio = 0;
987         break;
988     }
989
990     if (new_prio < clone->command.sop.prio) {
991         /* strange. let's just set our delta to 1 */
992         delta = 1;
993     } else {
994         delta = new_prio - clone->command.sop.prio;
995     }
996
997     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
998         clone->command.sop.prio = new_prio;
999     }
1000
1001     node->command.sop.prio += delta;
1002 }
1003
1004 static int
1005 AddToSalvageQueue(struct SalvageQueueNode * node)
1006 {
1007     afs_int32 id;
1008     struct SalvageQueueNode * last = NULL;
1009
1010     id = volutil_GetPartitionID(node->command.sop.partName);
1011     if (id < 0 || id > VOLMAXPARTS) {
1012         return 1;
1013     }
1014     if (!VGetPartitionById_r(id, 0)) {
1015         /* don't enqueue salvage requests for unmounted partitions */
1016         return 1;
1017     }
1018     if (queue_IsOnQueue(node)) {
1019         return 0;
1020     }
1021
1022     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1023         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1024     }
1025     queue_Append(&salvageQueue.part[id], node);
1026     salvageQueue.len[id]++;
1027     salvageQueue.total_len++;
1028     salvageQueue.last_insert = id;
1029     node->partition_id = id;
1030     node->state = SALVSYNC_STATE_QUEUED;
1031
1032     /* reorder, if necessary */
1033     if (last && last->command.sop.prio < node->command.sop.prio) {
1034         UpdateCommandPrio(node);
1035     }
1036
1037     assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1038     return 0;
1039 }
1040
1041 static void
1042 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1043 {
1044     if (queue_IsOnQueue(node)) {
1045         queue_Remove(node);
1046         salvageQueue.len[node->partition_id]--;
1047         salvageQueue.total_len--;
1048         node->state = SALVSYNC_STATE_UNKNOWN;
1049         assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1050     }
1051 }
1052
1053 static void
1054 AddToPendingQueue(struct SalvageQueueNode * node)
1055 {
1056     queue_Append(&pendingQueue, node);
1057     pendingQueue.len++;
1058     node->state = SALVSYNC_STATE_SALVAGING;
1059     assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1060 }
1061
1062 static void
1063 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1064 {
1065     if (queue_IsOnQueue(node)) {
1066         queue_Remove(node);
1067         pendingQueue.len--;
1068         node->state = SALVSYNC_STATE_UNKNOWN;
1069         assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1070     }
1071 }
1072
1073 static struct SalvageQueueNode *
1074 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1075 {
1076     struct SalvageQueueNode * np, * nnp;
1077
1078     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1079         if ((np->command.sop.volume == qry->volume) && 
1080             !strncmp(np->command.sop.partName, qry->partName,
1081                      sizeof(qry->partName)))
1082             break;
1083     }
1084
1085     if (queue_IsEnd(&pendingQueue, np))
1086         np = NULL;
1087     return np;
1088 }
1089
1090 static struct SalvageQueueNode *
1091 LookupPendingCommandByPid(int pid)
1092 {
1093     struct SalvageQueueNode * np, * nnp;
1094
1095     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1096         if (np->pid == pid)
1097             break;
1098     }
1099
1100     if (queue_IsEnd(&pendingQueue, np))
1101         np = NULL;
1102     return np;
1103 }
1104
1105
1106 /* raise the priority of a previously scheduled salvage */
1107 static void
1108 UpdateCommandPrio(struct SalvageQueueNode * node)
1109 {
1110     struct SalvageQueueNode *np, *nnp;
1111     afs_int32 id;
1112     afs_uint32 prio;
1113
1114     assert(queue_IsOnQueue(node));
1115
1116     prio = node->command.sop.prio;
1117     id = node->partition_id;
1118     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1119         queue_Remove(node);
1120         queue_Prepend(&salvageQueue.part[id], node);
1121     } else {
1122         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1123             if (np->command.sop.prio > prio)
1124                 break;
1125         }
1126         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1127             queue_Remove(node);
1128             queue_Prepend(&salvageQueue.part[id], node);
1129         } else if (node != np) {
1130             queue_Remove(node);
1131             queue_InsertAfter(np, node);
1132         }
1133     }
1134 }
1135
1136 /* this will need to be rearchitected if we ever want more than one thread
1137  * to wait for new salvage nodes */
1138 struct SalvageQueueNode * 
1139 SALVSYNC_getWork(void)
1140 {
1141     int i, ret;
1142     struct DiskPartition64 * dp = NULL, * fdp;
1143     static afs_int32 next_part_sched = 0;
1144     struct SalvageQueueNode *node = NULL, *np;
1145
1146     VOL_LOCK;
1147
1148     /*
1149      * wait for work to be scheduled
1150      * if there are no disk partitions, just sit in this wait loop forever
1151      */
1152     while (!salvageQueue.total_len || !DiskPartitionList) {
1153         VOL_CV_WAIT(&salvageQueue.cv);
1154     }
1155
1156     /* 
1157      * short circuit for simple case where only one partition has
1158      * scheduled salvages
1159      */
1160     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1161         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1162         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1163         goto have_node;
1164     }
1165
1166
1167     /* 
1168      * ok, more than one partition has scheduled salvages.
1169      * now search for partitions with scheduled salvages, but no pending salvages. 
1170      */
1171     dp = VGetPartitionById_r(next_part_sched, 0);
1172     if (!dp) {
1173         dp = DiskPartitionList;
1174     }
1175     fdp = dp;
1176
1177     for (i=0 ; 
1178          !i || dp != fdp ; 
1179          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1180         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1181             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1182             goto have_node;
1183         }
1184     }
1185
1186
1187     /*
1188      * all partitions with scheduled salvages have at least one pending.
1189      * now do an exhaustive search for a scheduled salvage.
1190      */
1191     dp = fdp;
1192
1193     for (i=0 ; 
1194          !i || dp != fdp ; 
1195          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1196         if (salvageQueue.len[dp->index]) {
1197             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1198             goto have_node;
1199         }
1200     }
1201
1202     /* we should never reach this line */
1203     assert(1==2);
1204
1205  have_node:
1206     assert(node != NULL);
1207     node->pid = 0;
1208     partition_salvaging[node->partition_id]++;
1209     DeleteFromSalvageQueue(node);
1210     AddToPendingQueue(node);
1211
1212     if (dp) {
1213         /* update next_part_sched field */
1214         if (dp->next) {
1215             next_part_sched = dp->next->index;
1216         } else if (DiskPartitionList) {
1217             next_part_sched = DiskPartitionList->index;
1218         } else {
1219             next_part_sched = -1;
1220         }
1221     }
1222
1223  bail:
1224     VOL_UNLOCK;
1225     return node;
1226 }
1227
1228 /**
1229  * update internal scheduler state to reflect completion of a work unit.
1230  *
1231  * @param[in]  node    salvage queue node object pointer
1232  * @param[in]  result  worker process result code
1233  *
1234  * @post scheduler state is updated.
1235  *
1236  * @internal
1237  */
1238 static void
1239 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1240 {
1241     afs_int32 partid;
1242     int idx;
1243
1244     DeleteFromPendingQueue(node);
1245     partid = node->partition_id;
1246     if (partid >=0 && partid <= VOLMAXPARTS) {
1247         partition_salvaging[partid]--;
1248     }
1249     if (result == 0) {
1250         node->state = SALVSYNC_STATE_DONE;
1251     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1252         node->state = SALVSYNC_STATE_ERROR;
1253     }
1254
1255     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1256         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1257             if (node->volgroup.children[idx]) {
1258                 node->volgroup.children[idx]->state = node->state;
1259             }
1260         }
1261     }
1262 }
1263
1264 /**
1265  * check whether worker child failed.
1266  *
1267  * @param[in] status  status bitfield return by wait()
1268  *
1269  * @return boolean failure code
1270  *    @retval 0 child succeeded
1271  *    @retval 1 child failed
1272  *
1273  * @internal
1274  */
1275 static int
1276 ChildFailed(int status)
1277 {
1278     return (WCOREDUMP(status) || 
1279             WIFSIGNALED(status) || 
1280             ((WEXITSTATUS(status) != 0) && 
1281              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1282 }
1283
1284
1285 /**
1286  * notify salvsync scheduler of node completion, by child pid.
1287  *
1288  * @param[in]  pid     pid of worker child
1289  * @param[in]  status  worker status bitfield from wait()
1290  *
1291  * @post scheduler state is updated.
1292  *       if status code is a failure, fileserver notification was attempted
1293  *
1294  * @see SALVSYNC_doneWork_r
1295  */
1296 void
1297 SALVSYNC_doneWorkByPid(int pid, int status)
1298 {
1299     struct SalvageQueueNode * node;
1300     char partName[16];
1301     afs_uint32 volids[VOLMAXTYPES+1];
1302     unsigned int idx;
1303
1304     memset(volids, 0, sizeof(volids));
1305
1306     VOL_LOCK;
1307     node = LookupPendingCommandByPid(pid);
1308     if (node != NULL) {
1309         SALVSYNC_doneWork_r(node, status);
1310
1311         if (ChildFailed(status)) {
1312             /* populate volume id list for later processing outside the glock */
1313             volids[0] = node->command.sop.volume;
1314             strcpy(partName, node->command.sop.partName);
1315             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1316                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1317                     if (node->volgroup.children[idx]) {
1318                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1319                     }
1320                 }
1321             }
1322         }
1323     }
1324     VOL_UNLOCK;
1325
1326     /*
1327      * if necessary, notify fileserver of
1328      * failure to salvage volume group
1329      * [we cannot guarantee that the child made the
1330      *  appropriate notifications (e.g. SIGSEGV)]
1331      *  -- tkeiser 11/28/2007
1332      */
1333     if (ChildFailed(status)) {
1334         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1335             if (volids[idx]) {
1336                 FSYNC_VolOp(volids[idx],
1337                             partName,
1338                             FSYNC_VOL_FORCE_ERROR,
1339                             FSYNC_WHATEVER,
1340                             NULL);
1341             }
1342         }
1343     }
1344 }
1345
1346 #endif /* AFS_DEMAND_ATTACH_FS */