f054dd6c6512ecff81fb1f3cafa1beadc7634c55
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29
30 #include <sys/types.h>
31 #include <stdio.h>
32 #ifdef AFS_NT40_ENV
33 #include <winsock2.h>
34 #include <time.h>
35 #else
36 #include <sys/param.h>
37 #include <sys/socket.h>
38 #include <netinet/in.h>
39 #include <netdb.h>
40 #include <sys/time.h>
41 #endif
42 #include <errno.h>
43 #include <afs/afs_assert.h>
44 #include <signal.h>
45 #include <string.h>
46
47
48 #include <rx/xdr.h>
49 #include <afs/afsint.h>
50 #include "nfs.h"
51 #include <afs/errors.h>
52 #include "salvsync.h"
53 #include "lwp.h"
54 #include "lock.h"
55 #include <afs/afssyscalls.h>
56 #include "ihandle.h"
57 #include "vnode.h"
58 #include "volume.h"
59 #include "partition.h"
60 #include "common.h"
61 #include <rx/rx_queue.h>
62 #include <afs/procmgmt.h>
63
64 #if !defined(offsetof)
65 #include <stddef.h>
66 #endif
67
68 #ifdef USE_UNIX_SOCKETS
69 #include <afs/afsutil.h>
70 #include <sys/un.h>
71 #endif
72
73 #ifndef WCOREDUMP
74 #define WCOREDUMP(x)    ((x) & 0200)
75 #endif
76
77 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
78                                  * move = dump+restore can run on single server */
79
80
81 /*
82  * This lock controls access to the handler array.
83  */
84 struct Lock SALVSYNC_handler_lock;
85
86
87 #ifdef AFS_DEMAND_ATTACH_FS
88 /*
89  * SALVSYNC is a feature specific to the demand attach fileserver
90  */
91
92 /* Forward declarations */
93 static void * SALVSYNC_syncThread(void *);
94 static void SALVSYNC_newconnection(osi_socket fd);
95 static void SALVSYNC_com(osi_socket fd);
96 static void SALVSYNC_Drop(osi_socket fd);
97 static void AcceptOn(void);
98 static void AcceptOff(void);
99 static void InitHandler(void);
100 static void CallHandler(fd_set * fdsetp);
101 static int AddHandler(osi_socket afd, void (*aproc) (int));
102 static int FindHandler(osi_socket afd);
103 static int FindHandler_r(osi_socket afd);
104 static int RemoveHandler(osi_socket afd);
105 static void GetHandler(fd_set * fdsetp, int *maxfdp);
106
107 static int AllocNode(struct SalvageQueueNode ** node);
108
109 static int AddToSalvageQueue(struct SalvageQueueNode * node);
110 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
111 static void AddToPendingQueue(struct SalvageQueueNode * node);
112 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
113 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
114 static void UpdateCommandPrio(struct SalvageQueueNode * node);
115 static void HandlePrio(struct SalvageQueueNode * clone,
116                        struct SalvageQueueNode * parent,
117                        afs_uint32 new_prio);
118
119 static int LinkNode(struct SalvageQueueNode * parent,
120                     struct SalvageQueueNode * clone);
121
122 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
123                                             struct SalvageQueueNode ** parent);
124 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
125                                                      struct SalvageQueueNode ** parent);
126 static void AddNodeToHash(struct SalvageQueueNode * node);
127
128 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
129 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
130 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
131 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
132 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
133
134
135 extern int LogLevel;
136 extern int VInit;
137 extern pthread_mutex_t vol_salvsync_mutex;
138
139 /**
140  * salvsync server socket handle.
141  */
142 static SYNC_server_state_t salvsync_server_state =
143     { OSI_NULLSOCKET,                       /* file descriptor */
144       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
145       SALVSYNC_PROTO_VERSION,   /* protocol version */
146       5,                        /* bind() retry limit */
147       100,                      /* listen() queue depth */
148       "SALVSYNC",               /* protocol name string */
149     };
150
151
152 /**
153  * queue of all volumes waiting to be salvaged.
154  */
155 struct SalvageQueue {
156     volatile int total_len;
157     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
158     volatile int len[VOLMAXPARTS+1];
159     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
160     pthread_cond_t cv;
161 };
162 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
163
164 /**
165  * queue of all volumes currently being salvaged.
166  */
167 struct QueueHead {
168     volatile struct rx_queue q;  /**< queue of salvages in progress */
169     volatile int len;            /**< length of in-progress queue */
170     pthread_cond_t queue_change_cv;
171 };
172 static struct QueueHead pendingQueue;  /* volumes being salvaged */
173
174 /* XXX
175  * whether a partition has a salvage in progress
176  *
177  * the salvager code only permits one salvage per partition at a time
178  *
179  * the following hack tries to keep salvaged parallelism high by
180  * only permitting one salvage dispatch per partition at a time
181  *
182  * unfortunately, the parallel salvager currently
183  * has a rather braindead routine that won't permit
184  * multiple salvages on the same "device".  this
185  * function happens to break pretty badly on lvm, raid luns, etc.
186  *
187  * this hack isn't good enough to stop the device limiting code from
188  * crippling performance.  someday that code needs to be rewritten
189  */
190 static int partition_salvaging[VOLMAXPARTS+1];
191
192 static int HandlerFD[MAXHANDLERS];
193 static void (*HandlerProc[MAXHANDLERS]) (int);
194
195 #define VSHASH_SIZE 64
196 #define VSHASH_MASK (VSHASH_SIZE-1)
197 #define VSHASH(vid) ((vid)&VSHASH_MASK)
198
199 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
200
201 static struct SalvageQueueNode *
202 LookupNode(afs_uint32 vid, char * partName,
203            struct SalvageQueueNode ** parent)
204 {
205     struct rx_queue *qp, *nqp;
206     struct SalvageQueueNode *vsp;
207     int idx = VSHASH(vid);
208
209     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
210         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
211         if ((vsp->command.sop.volume == vid) &&
212             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
213             break;
214         }
215     }
216
217     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
218         vsp = NULL;
219     }
220
221     if (parent) {
222         if (vsp) {
223             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
224                 vsp->volgroup.parent : vsp;
225         } else {
226             *parent = NULL;
227         }
228     }
229
230     return vsp;
231 }
232
233 static struct SalvageQueueNode *
234 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
235                     struct SalvageQueueNode ** parent)
236 {
237     return LookupNode(qry->volume, qry->partName, parent);
238 }
239
240 static void
241 AddNodeToHash(struct SalvageQueueNode * node)
242 {
243     int idx = VSHASH(node->command.sop.volume);
244
245     if (queue_IsOnQueue(&node->hash_chain)) {
246         return;
247     }
248
249     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
250     SalvageHashTable[idx].len++;
251 }
252
253 #if 0
254 static void
255 DeleteNodeFromHash(struct SalvageQueueNode * node)
256 {
257     int idx = VSHASH(node->command.sop.volume);
258
259     if (queue_IsNotOnQueue(&node->hash_chain)) {
260         return;
261     }
262
263     queue_Remove(&node->hash_chain);
264     SalvageHashTable[idx].len--;
265 }
266 #endif
267
268 void
269 SALVSYNC_salvInit(void)
270 {
271     int i;
272     pthread_t tid;
273     pthread_attr_t tattr;
274
275     /* initialize the queues */
276     Lock_Init(&SALVSYNC_handler_lock);
277     CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
278     for (i = 0; i <= VOLMAXPARTS; i++) {
279         queue_Init(&salvageQueue.part[i]);
280         salvageQueue.len[i] = 0;
281     }
282     CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
283     queue_Init(&pendingQueue);
284     salvageQueue.total_len = pendingQueue.len = 0;
285     salvageQueue.last_insert = -1;
286     memset(partition_salvaging, 0, sizeof(partition_salvaging));
287
288     for (i = 0; i < VSHASH_SIZE; i++) {
289         CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
290         SalvageHashTable[i].len = 0;
291         queue_Init(&SalvageHashTable[i]);
292     }
293
294     /* start the salvsync thread */
295     osi_Assert(pthread_attr_init(&tattr) == 0);
296     osi_Assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
297     osi_Assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
298 }
299
300 static void
301 CleanFDs(void)
302 {
303     int i;
304     for (i = 0; i < MAXHANDLERS; ++i) {
305         if (HandlerFD[i] >= 0) {
306             SALVSYNC_Drop(HandlerFD[i]);
307         }
308     }
309
310     /* just in case we were in AcceptOff mode, and thus this fd wouldn't
311      * have a handler */
312     close(salvsync_server_state.fd);
313     salvsync_server_state.fd = OSI_NULLSOCKET;
314 }
315
316 static fd_set SALVSYNC_readfds;
317
318 static void *
319 SALVSYNC_syncThread(void * args)
320 {
321     int code;
322     SYNC_server_state_t * state = &salvsync_server_state;
323
324     /* when we fork, the child needs to close the salvsync server sockets,
325      * otherwise, it may get salvsync requests, instead of the parent
326      * salvageserver */
327     osi_Assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
328
329     SYNC_getAddr(&state->endpoint, &state->addr);
330     SYNC_cleanupSock(state);
331
332 #ifndef AFS_NT40_ENV
333     (void)signal(SIGPIPE, SIG_IGN);
334 #endif
335
336     state->fd = SYNC_getSock(&state->endpoint);
337     code = SYNC_bindSock(state);
338     osi_Assert(!code);
339
340     InitHandler();
341     AcceptOn();
342
343     for (;;) {
344         int maxfd;
345         struct timeval s_timeout;
346         GetHandler(&SALVSYNC_readfds, &maxfd);
347         s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
348         s_timeout.tv_usec = 0;
349         /* Note: check for >= 1 below is essential since IOMGR_select
350          * doesn't have exactly same semantics as select.
351          */
352         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
353             CallHandler(&SALVSYNC_readfds);
354     }
355
356     return NULL;
357 }
358
359 static void
360 SALVSYNC_newconnection(int afd)
361 {
362 #ifdef USE_UNIX_SOCKETS
363     struct sockaddr_un other;
364 #else  /* USE_UNIX_SOCKETS */
365     struct sockaddr_in other;
366 #endif
367     int fd;
368     socklen_t junk;
369
370     junk = sizeof(other);
371     fd = accept(afd, (struct sockaddr *)&other, &junk);
372     if (fd == OSI_NULLSOCKET) {
373         osi_Panic("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
374     } else if (!AddHandler(fd, SALVSYNC_com)) {
375         AcceptOff();
376         osi_Assert(AddHandler(fd, SALVSYNC_com));
377     }
378 }
379
380 /* this function processes commands from an salvsync file descriptor (fd) */
381 static afs_int32 SALV_cnt = 0;
382 static void
383 SALVSYNC_com(osi_socket fd)
384 {
385     SYNC_command com;
386     SYNC_response res;
387     SALVSYNC_response_hdr sres_hdr;
388     SALVSYNC_command scom;
389     SALVSYNC_response sres;
390     SYNC_PROTO_BUF_DECL(buf);
391
392     memset(&com, 0, sizeof(com));
393     memset(&res, 0, sizeof(res));
394     memset(&scom, 0, sizeof(scom));
395     memset(&sres, 0, sizeof(sres));
396     memset(&sres_hdr, 0, sizeof(sres_hdr));
397
398     com.payload.buf = (void *)buf;
399     com.payload.len = SYNC_PROTO_MAX_LEN;
400     res.payload.buf = (void *) &sres_hdr;
401     res.payload.len = sizeof(sres_hdr);
402     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
403     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
404
405     scom.hdr = &com.hdr;
406     scom.sop = (SALVSYNC_command_hdr *) buf;
407     scom.com = &com;
408     sres.hdr = &res.hdr;
409     sres.sop = &sres_hdr;
410     sres.res = &res;
411
412     SALV_cnt++;
413     if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
414         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
415         SALVSYNC_Drop(fd);
416         return;
417     }
418
419     if (com.recv_len < sizeof(com.hdr)) {
420         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
421         res.hdr.response = SYNC_COM_ERROR;
422         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
423         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
424         goto respond;
425     }
426
427     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
428         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
429         res.hdr.response = SYNC_COM_ERROR;
430         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
431         goto respond;
432     }
433
434     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
435         res.hdr.response = SYNC_OK;
436         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
437
438         /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
439          * never wait for a response. */
440         goto done;
441     }
442
443     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
444         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
445         res.hdr.response = SYNC_COM_ERROR;
446         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
447         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
448         goto respond;
449     }
450
451     res.hdr.com_seq = com.hdr.com_seq;
452
453     VOL_LOCK;
454     switch (com.hdr.command) {
455     case SALVSYNC_NOP:
456         break;
457     case SALVSYNC_SALVAGE:
458     case SALVSYNC_RAISEPRIO:
459         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
460         break;
461     case SALVSYNC_CANCEL:
462         /* cancel a salvage */
463         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
464         break;
465     case SALVSYNC_CANCELALL:
466         /* cancel all queued salvages */
467         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
468         break;
469     case SALVSYNC_QUERY:
470         /* query whether a volume is done salvaging */
471         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
472         break;
473     case SALVSYNC_OP_LINK:
474         /* link a clone to its parent in the scheduler */
475         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
476         break;
477     default:
478         res.hdr.response = SYNC_BAD_COMMAND;
479         break;
480     }
481
482     sres_hdr.sq_len = salvageQueue.total_len;
483     sres_hdr.pq_len = pendingQueue.len;
484     VOL_UNLOCK;
485
486  respond:
487     SYNC_putRes(&salvsync_server_state, fd, &res);
488
489  done:
490     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
491         SALVSYNC_Drop(fd);
492     }
493 }
494
495 /**
496  * request that a volume be salvaged.
497  *
498  * @param[in]  com  inbound command object
499  * @param[out] res  outbound response object
500  *
501  * @return operation status
502  *    @retval SYNC_OK success
503  *    @retval SYNC_DENIED failed to enqueue request
504  *    @retval SYNC_FAILED malformed command packet
505  *
506  * @note this is a SALVSYNC protocol rpc handler
507  *
508  * @internal
509  *
510  * @post the volume is enqueued in the to-be-salvaged queue.
511  *       if the volume was already in the salvage queue, its
512  *       priority (and thus its location in the queue) are
513  *       updated.
514  */
515 static afs_int32
516 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
517 {
518     afs_int32 code = SYNC_OK;
519     struct SalvageQueueNode * node, * clone;
520     int hash = 0;
521
522     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
523         code = SYNC_FAILED;
524         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
525         goto done;
526     }
527
528     clone = LookupNodeByCommand(com->sop, &node);
529
530     if (node == NULL) {
531         if (AllocNode(&node)) {
532             code = SYNC_DENIED;
533             res->hdr->reason = SYNC_REASON_NOMEM;
534             goto done;
535         }
536         clone = node;
537         hash = 1;
538     }
539
540     HandlePrio(clone, node, com->sop->prio);
541
542     switch (node->state) {
543     case SALVSYNC_STATE_QUEUED:
544         UpdateCommandPrio(node);
545         break;
546
547     case SALVSYNC_STATE_ERROR:
548     case SALVSYNC_STATE_DONE:
549     case SALVSYNC_STATE_UNKNOWN:
550         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
551         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
552
553         /*
554          * make sure volgroup parent partition path is kept coherent
555          *
556          * If we ever want to support non-COW clones on a machine holding
557          * the RW site, please note that this code does not work under the
558          * conditions where someone zaps a COW clone on partition X, and
559          * subsequently creates a full clone on partition Y -- we'd need
560          * an inverse to SALVSYNC_com_Link.
561          *  -- tkeiser 11/28/2007
562          */
563         strcpy(node->command.sop.partName, com->sop->partName);
564
565         if (AddToSalvageQueue(node)) {
566             code = SYNC_DENIED;
567         }
568         break;
569
570     default:
571         break;
572     }
573
574     if (hash) {
575         AddNodeToHash(node);
576     }
577
578     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
579     res->sop->state = node->state;
580     res->sop->prio = node->command.sop.prio;
581
582  done:
583     return code;
584 }
585
586 /**
587  * cancel a pending salvage request.
588  *
589  * @param[in]  com  inbound command object
590  * @param[out] res  outbound response object
591  *
592  * @return operation status
593  *    @retval SYNC_OK success
594  *    @retval SYNC_FAILED malformed command packet
595  *
596  * @note this is a SALVSYNC protocol rpc handler
597  *
598  * @internal
599  */
600 static afs_int32
601 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
602 {
603     afs_int32 code = SYNC_OK;
604     struct SalvageQueueNode * node;
605
606     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
607         code = SYNC_FAILED;
608         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
609         goto done;
610     }
611
612     node = LookupNodeByCommand(com->sop, NULL);
613
614     if (node == NULL) {
615         res->sop->state = SALVSYNC_STATE_UNKNOWN;
616         res->sop->prio = 0;
617     } else {
618         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
619         res->sop->prio = node->command.sop.prio;
620         res->sop->state = node->state;
621         if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
622             (node->state == SALVSYNC_STATE_QUEUED)) {
623             DeleteFromSalvageQueue(node);
624         }
625     }
626
627  done:
628     return code;
629 }
630
631 /**
632  * cancel all pending salvage requests.
633  *
634  * @param[in]  com  incoming command object
635  * @param[out] res  outbound response object
636  *
637  * @return operation status
638  *    @retval SYNC_OK success
639  *
640  * @note this is a SALVSYNC protocol rpc handler
641  *
642  * @internal
643  */
644 static afs_int32
645 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
646 {
647     struct SalvageQueueNode * np, *nnp;
648     struct DiskPartition64 * dp;
649
650     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
651         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
652             DeleteFromSalvageQueue(np);
653         }
654     }
655
656     return SYNC_OK;
657 }
658
659 /**
660  * link a queue node for a clone to its parent volume.
661  *
662  * @param[in]  com   inbound command object
663  * @param[out] res   outbound response object
664  *
665  * @return operation status
666  *    @retval SYNC_OK success
667  *    @retval SYNC_FAILED malformed command packet
668  *    @retval SYNC_DENIED the request could not be completed
669  *
670  * @note this is a SALVSYNC protocol rpc handler
671  *
672  * @post the requested volume is marked as a child of another volume.
673  *       thus, future salvage requests for this volume will result in the
674  *       parent of the volume group being scheduled for salvage instead
675  *       of this clone.
676  *
677  * @internal
678  */
679 static afs_int32
680 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
681 {
682     afs_int32 code = SYNC_OK;
683     struct SalvageQueueNode * clone, * parent;
684
685     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
686         code = SYNC_FAILED;
687         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
688         goto done;
689     }
690
691     /* lookup clone's salvage scheduling node */
692     clone = LookupNodeByCommand(com->sop, NULL);
693     if (clone == NULL) {
694         code = SYNC_DENIED;
695         res->hdr->reason = SALVSYNC_REASON_ERROR;
696         goto done;
697     }
698
699     /* lookup parent's salvage scheduling node */
700     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
701     if (parent == NULL) {
702         if (AllocNode(&parent)) {
703             code = SYNC_DENIED;
704             res->hdr->reason = SYNC_REASON_NOMEM;
705             goto done;
706         }
707         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
708         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
709         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
710         AddNodeToHash(parent);
711     }
712
713     if (LinkNode(parent, clone)) {
714         code = SYNC_DENIED;
715         goto done;
716     }
717
718  done:
719     return code;
720 }
721
722 /**
723  * query the status of a volume salvage request.
724  *
725  * @param[in]  com   inbound command object
726  * @param[out] res   outbound response object
727  *
728  * @return operation status
729  *    @retval SYNC_OK success
730  *    @retval SYNC_FAILED malformed command packet
731  *
732  * @note this is a SALVSYNC protocol rpc handler
733  *
734  * @internal
735  */
736 static afs_int32
737 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
738 {
739     afs_int32 code = SYNC_OK;
740     struct SalvageQueueNode * node;
741
742     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
743         code = SYNC_FAILED;
744         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
745         goto done;
746     }
747
748     LookupNodeByCommand(com->sop, &node);
749
750     /* query whether a volume is done salvaging */
751     if (node == NULL) {
752         res->sop->state = SALVSYNC_STATE_UNKNOWN;
753         res->sop->prio = 0;
754     } else {
755         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
756         res->sop->state = node->state;
757         res->sop->prio = node->command.sop.prio;
758     }
759
760  done:
761     return code;
762 }
763
764 static void
765 SALVSYNC_Drop(osi_socket fd)
766 {
767     RemoveHandler(fd);
768 #ifdef AFS_NT40_ENV
769     closesocket(fd);
770 #else
771     close(fd);
772 #endif
773     AcceptOn();
774 }
775
776 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
777
778 static void
779 AcceptOn(void)
780 {
781     if (AcceptHandler == -1) {
782         osi_Assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
783         AcceptHandler = FindHandler(salvsync_server_state.fd);
784     }
785 }
786
787 static void
788 AcceptOff(void)
789 {
790     if (AcceptHandler != -1) {
791         osi_Assert(RemoveHandler(salvsync_server_state.fd));
792         AcceptHandler = -1;
793     }
794 }
795
796 /* The multiple FD handling code. */
797
798 static void
799 InitHandler(void)
800 {
801     int i;
802     ObtainWriteLock(&SALVSYNC_handler_lock);
803     for (i = 0; i < MAXHANDLERS; i++) {
804         HandlerFD[i] = OSI_NULLSOCKET;
805         HandlerProc[i] = NULL;
806     }
807     ReleaseWriteLock(&SALVSYNC_handler_lock);
808 }
809
810 static void
811 CallHandler(fd_set * fdsetp)
812 {
813     int i;
814     ObtainReadLock(&SALVSYNC_handler_lock);
815     for (i = 0; i < MAXHANDLERS; i++) {
816         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
817             ReleaseReadLock(&SALVSYNC_handler_lock);
818             (*HandlerProc[i]) (HandlerFD[i]);
819             ObtainReadLock(&SALVSYNC_handler_lock);
820         }
821     }
822     ReleaseReadLock(&SALVSYNC_handler_lock);
823 }
824
825 static int
826 AddHandler(osi_socket afd, void (*aproc) (int))
827 {
828     int i;
829     ObtainWriteLock(&SALVSYNC_handler_lock);
830     for (i = 0; i < MAXHANDLERS; i++)
831         if (HandlerFD[i] == OSI_NULLSOCKET)
832             break;
833     if (i >= MAXHANDLERS) {
834         ReleaseWriteLock(&SALVSYNC_handler_lock);
835         return 0;
836     }
837     HandlerFD[i] = afd;
838     HandlerProc[i] = aproc;
839     ReleaseWriteLock(&SALVSYNC_handler_lock);
840     return 1;
841 }
842
843 static int
844 FindHandler(osi_socket afd)
845 {
846     int i;
847     ObtainReadLock(&SALVSYNC_handler_lock);
848     for (i = 0; i < MAXHANDLERS; i++)
849         if (HandlerFD[i] == afd) {
850             ReleaseReadLock(&SALVSYNC_handler_lock);
851             return i;
852         }
853     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
854     osi_Panic("Failed to find handler\n");
855     return -1;                  /* satisfy compiler */
856 }
857
858 static int
859 FindHandler_r(osi_socket afd)
860 {
861     int i;
862     for (i = 0; i < MAXHANDLERS; i++)
863         if (HandlerFD[i] == afd) {
864             return i;
865         }
866     osi_Panic("Failed to find handler\n");
867     return -1;                  /* satisfy compiler */
868 }
869
870 static int
871 RemoveHandler(osi_socket afd)
872 {
873     ObtainWriteLock(&SALVSYNC_handler_lock);
874     HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
875     ReleaseWriteLock(&SALVSYNC_handler_lock);
876     return 1;
877 }
878
879 static void
880 GetHandler(fd_set * fdsetp, int *maxfdp)
881 {
882     int i;
883     int maxfd = -1;
884     FD_ZERO(fdsetp);
885     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
886     for (i = 0; i < MAXHANDLERS; i++)
887         if (HandlerFD[i] != OSI_NULLSOCKET) {
888             FD_SET(HandlerFD[i], fdsetp);
889 #ifndef AFS_NT40_ENV
890             /* On Windows the nfds parameter to select() is ignored */
891             if (maxfd < HandlerFD[i] || maxfd == (int)-1)
892                 maxfd = HandlerFD[i];
893 #endif
894         }
895     *maxfdp = maxfd;
896     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
897 }
898
899 /**
900  * allocate a salvage queue node.
901  *
902  * @param[out] node_out  address in which to store new node pointer
903  *
904  * @return operation status
905  *    @retval 0 success
906  *    @retval 1 failed to allocate node
907  *
908  * @internal
909  */
910 static int
911 AllocNode(struct SalvageQueueNode ** node_out)
912 {
913     int code = 0;
914     struct SalvageQueueNode * node;
915
916     *node_out = node = (struct SalvageQueueNode *)
917         malloc(sizeof(struct SalvageQueueNode));
918     if (node == NULL) {
919         code = 1;
920         goto done;
921     }
922
923     memset(node, 0, sizeof(struct SalvageQueueNode));
924     node->type = SALVSYNC_VOLGROUP_PARENT;
925     node->state = SALVSYNC_STATE_UNKNOWN;
926
927  done:
928     return code;
929 }
930
931 /**
932  * link a salvage queue node to its parent.
933  *
934  * @param[in] parent  pointer to queue node for parent of volume group
935  * @param[in] clone   pointer to queue node for a clone
936  *
937  * @return operation status
938  *    @retval 0 success
939  *    @retval 1 failure
940  *
941  * @internal
942  */
943 static int
944 LinkNode(struct SalvageQueueNode * parent,
945          struct SalvageQueueNode * clone)
946 {
947     int code = 0;
948     int idx;
949
950     /* check for attaching a clone to a clone */
951     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
952         code = 1;
953         goto done;
954     }
955
956     /* check for pre-existing registration and openings */
957     for (idx = 0; idx < VOLMAXTYPES; idx++) {
958         if (parent->volgroup.children[idx] == clone) {
959             goto linked;
960         }
961         if (parent->volgroup.children[idx] == NULL) {
962             break;
963         }
964     }
965     if (idx == VOLMAXTYPES) {
966         code = 1;
967         goto done;
968     }
969
970     /* link parent and child */
971     parent->volgroup.children[idx] = clone;
972     clone->type = SALVSYNC_VOLGROUP_CLONE;
973     clone->volgroup.parent = parent;
974
975
976  linked:
977     switch (clone->state) {
978     case SALVSYNC_STATE_QUEUED:
979         DeleteFromSalvageQueue(clone);
980
981     case SALVSYNC_STATE_SALVAGING:
982         switch (parent->state) {
983         case SALVSYNC_STATE_UNKNOWN:
984         case SALVSYNC_STATE_ERROR:
985         case SALVSYNC_STATE_DONE:
986             parent->command.sop.prio = clone->command.sop.prio;
987             AddToSalvageQueue(parent);
988             break;
989
990         case SALVSYNC_STATE_QUEUED:
991             if (clone->command.sop.prio) {
992                 parent->command.sop.prio += clone->command.sop.prio;
993                 UpdateCommandPrio(parent);
994             }
995             break;
996
997         default:
998             break;
999         }
1000         break;
1001
1002     default:
1003         break;
1004     }
1005
1006  done:
1007     return code;
1008 }
1009
1010 static void
1011 HandlePrio(struct SalvageQueueNode * clone,
1012            struct SalvageQueueNode * node,
1013            afs_uint32 new_prio)
1014 {
1015     afs_uint32 delta;
1016
1017     switch (node->state) {
1018     case SALVSYNC_STATE_ERROR:
1019     case SALVSYNC_STATE_DONE:
1020     case SALVSYNC_STATE_UNKNOWN:
1021         node->command.sop.prio = 0;
1022         break;
1023     default:
1024         break;
1025     }
1026
1027     if (new_prio < clone->command.sop.prio) {
1028         /* strange. let's just set our delta to 1 */
1029         delta = 1;
1030     } else {
1031         delta = new_prio - clone->command.sop.prio;
1032     }
1033
1034     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1035         clone->command.sop.prio = new_prio;
1036     }
1037
1038     node->command.sop.prio += delta;
1039 }
1040
1041 static int
1042 AddToSalvageQueue(struct SalvageQueueNode * node)
1043 {
1044     afs_int32 id;
1045     struct SalvageQueueNode * last = NULL;
1046
1047     id = volutil_GetPartitionID(node->command.sop.partName);
1048     if (id < 0 || id > VOLMAXPARTS) {
1049         return 1;
1050     }
1051     if (!VGetPartitionById_r(id, 0)) {
1052         /* don't enqueue salvage requests for unmounted partitions */
1053         return 1;
1054     }
1055     if (queue_IsOnQueue(node)) {
1056         return 0;
1057     }
1058
1059     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1060         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1061     }
1062     queue_Append(&salvageQueue.part[id], node);
1063     salvageQueue.len[id]++;
1064     salvageQueue.total_len++;
1065     salvageQueue.last_insert = id;
1066     node->partition_id = id;
1067     node->state = SALVSYNC_STATE_QUEUED;
1068
1069     /* reorder, if necessary */
1070     if (last && last->command.sop.prio < node->command.sop.prio) {
1071         UpdateCommandPrio(node);
1072     }
1073
1074     CV_BROADCAST(&salvageQueue.cv);
1075     return 0;
1076 }
1077
1078 static void
1079 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1080 {
1081     if (queue_IsOnQueue(node)) {
1082         queue_Remove(node);
1083         salvageQueue.len[node->partition_id]--;
1084         salvageQueue.total_len--;
1085         node->state = SALVSYNC_STATE_UNKNOWN;
1086         CV_BROADCAST(&salvageQueue.cv);
1087     }
1088 }
1089
1090 static void
1091 AddToPendingQueue(struct SalvageQueueNode * node)
1092 {
1093     queue_Append(&pendingQueue, node);
1094     pendingQueue.len++;
1095     node->state = SALVSYNC_STATE_SALVAGING;
1096     CV_BROADCAST(&pendingQueue.queue_change_cv);
1097 }
1098
1099 static void
1100 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1101 {
1102     if (queue_IsOnQueue(node)) {
1103         queue_Remove(node);
1104         pendingQueue.len--;
1105         node->state = SALVSYNC_STATE_UNKNOWN;
1106         CV_BROADCAST(&pendingQueue.queue_change_cv);
1107     }
1108 }
1109
1110 #if 0
1111 static struct SalvageQueueNode *
1112 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1113 {
1114     struct SalvageQueueNode * np, * nnp;
1115
1116     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1117         if ((np->command.sop.volume == qry->volume) &&
1118             !strncmp(np->command.sop.partName, qry->partName,
1119                      sizeof(qry->partName)))
1120             break;
1121     }
1122
1123     if (queue_IsEnd(&pendingQueue, np))
1124         np = NULL;
1125     return np;
1126 }
1127 #endif
1128
1129 static struct SalvageQueueNode *
1130 LookupPendingCommandByPid(int pid)
1131 {
1132     struct SalvageQueueNode * np, * nnp;
1133
1134     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1135         if (np->pid == pid)
1136             break;
1137     }
1138
1139     if (queue_IsEnd(&pendingQueue, np))
1140         np = NULL;
1141     return np;
1142 }
1143
1144
1145 /* raise the priority of a previously scheduled salvage */
1146 static void
1147 UpdateCommandPrio(struct SalvageQueueNode * node)
1148 {
1149     struct SalvageQueueNode *np, *nnp;
1150     afs_int32 id;
1151     afs_uint32 prio;
1152
1153     osi_Assert(queue_IsOnQueue(node));
1154
1155     prio = node->command.sop.prio;
1156     id = node->partition_id;
1157     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1158         queue_Remove(node);
1159         queue_Prepend(&salvageQueue.part[id], node);
1160     } else {
1161         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1162             if (np->command.sop.prio > prio)
1163                 break;
1164         }
1165         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1166             queue_Remove(node);
1167             queue_Prepend(&salvageQueue.part[id], node);
1168         } else if (node != np) {
1169             queue_Remove(node);
1170             queue_InsertAfter(np, node);
1171         }
1172     }
1173 }
1174
1175 /* this will need to be rearchitected if we ever want more than one thread
1176  * to wait for new salvage nodes */
1177 struct SalvageQueueNode *
1178 SALVSYNC_getWork(void)
1179 {
1180     int i;
1181     struct DiskPartition64 * dp = NULL, * fdp;
1182     static afs_int32 next_part_sched = 0;
1183     struct SalvageQueueNode *node = NULL;
1184
1185     VOL_LOCK;
1186
1187     /*
1188      * wait for work to be scheduled
1189      * if there are no disk partitions, just sit in this wait loop forever
1190      */
1191     while (!salvageQueue.total_len || !DiskPartitionList) {
1192         VOL_CV_WAIT(&salvageQueue.cv);
1193     }
1194
1195     /*
1196      * short circuit for simple case where only one partition has
1197      * scheduled salvages
1198      */
1199     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1200         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1201         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1202         goto have_node;
1203     }
1204
1205
1206     /*
1207      * ok, more than one partition has scheduled salvages.
1208      * now search for partitions with scheduled salvages, but no pending salvages.
1209      */
1210     dp = VGetPartitionById_r(next_part_sched, 0);
1211     if (!dp) {
1212         dp = DiskPartitionList;
1213     }
1214     fdp = dp;
1215
1216     for (i=0 ;
1217          !i || dp != fdp ;
1218          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1219         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1220             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1221             goto have_node;
1222         }
1223     }
1224
1225
1226     /*
1227      * all partitions with scheduled salvages have at least one pending.
1228      * now do an exhaustive search for a scheduled salvage.
1229      */
1230     dp = fdp;
1231
1232     for (i=0 ;
1233          !i || dp != fdp ;
1234          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1235         if (salvageQueue.len[dp->index]) {
1236             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1237             goto have_node;
1238         }
1239     }
1240
1241     /* we should never reach this line */
1242     osi_Panic("Node not found\n");
1243
1244  have_node:
1245     osi_Assert(node != NULL);
1246     node->pid = 0;
1247     partition_salvaging[node->partition_id]++;
1248     DeleteFromSalvageQueue(node);
1249     AddToPendingQueue(node);
1250
1251     if (dp) {
1252         /* update next_part_sched field */
1253         if (dp->next) {
1254             next_part_sched = dp->next->index;
1255         } else if (DiskPartitionList) {
1256             next_part_sched = DiskPartitionList->index;
1257         } else {
1258             next_part_sched = -1;
1259         }
1260     }
1261
1262     VOL_UNLOCK;
1263     return node;
1264 }
1265
1266 /**
1267  * update internal scheduler state to reflect completion of a work unit.
1268  *
1269  * @param[in]  node    salvage queue node object pointer
1270  * @param[in]  result  worker process result code
1271  *
1272  * @post scheduler state is updated.
1273  *
1274  * @internal
1275  */
1276 static void
1277 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1278 {
1279     afs_int32 partid;
1280     int idx;
1281
1282     DeleteFromPendingQueue(node);
1283     partid = node->partition_id;
1284     if (partid >=0 && partid <= VOLMAXPARTS) {
1285         partition_salvaging[partid]--;
1286     }
1287     if (result == 0) {
1288         node->state = SALVSYNC_STATE_DONE;
1289     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1290         node->state = SALVSYNC_STATE_ERROR;
1291     }
1292
1293     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1294         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1295             if (node->volgroup.children[idx]) {
1296                 node->volgroup.children[idx]->state = node->state;
1297             }
1298         }
1299     }
1300 }
1301
1302 /**
1303  * check whether worker child failed.
1304  *
1305  * @param[in] status  status bitfield return by wait()
1306  *
1307  * @return boolean failure code
1308  *    @retval 0 child succeeded
1309  *    @retval 1 child failed
1310  *
1311  * @internal
1312  */
1313 static int
1314 ChildFailed(int status)
1315 {
1316     return (WCOREDUMP(status) ||
1317             WIFSIGNALED(status) ||
1318             ((WEXITSTATUS(status) != 0) &&
1319              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1320 }
1321
1322
1323 /**
1324  * notify salvsync scheduler of node completion, by child pid.
1325  *
1326  * @param[in]  pid     pid of worker child
1327  * @param[in]  status  worker status bitfield from wait()
1328  *
1329  * @post scheduler state is updated.
1330  *       if status code is a failure, fileserver notification was attempted
1331  *
1332  * @see SALVSYNC_doneWork_r
1333  */
1334 void
1335 SALVSYNC_doneWorkByPid(int pid, int status)
1336 {
1337     struct SalvageQueueNode * node;
1338     char partName[16];
1339     afs_uint32 volids[VOLMAXTYPES+1];
1340     unsigned int idx;
1341
1342     memset(volids, 0, sizeof(volids));
1343
1344     VOL_LOCK;
1345     node = LookupPendingCommandByPid(pid);
1346     if (node != NULL) {
1347         SALVSYNC_doneWork_r(node, status);
1348
1349         if (ChildFailed(status)) {
1350             /* populate volume id list for later processing outside the glock */
1351             volids[0] = node->command.sop.volume;
1352             strcpy(partName, node->command.sop.partName);
1353             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1354                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1355                     if (node->volgroup.children[idx]) {
1356                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1357                     }
1358                 }
1359             }
1360         }
1361     }
1362     VOL_UNLOCK;
1363
1364     /*
1365      * if necessary, notify fileserver of
1366      * failure to salvage volume group
1367      * [we cannot guarantee that the child made the
1368      *  appropriate notifications (e.g. SIGSEGV)]
1369      *  -- tkeiser 11/28/2007
1370      */
1371     if (ChildFailed(status)) {
1372         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1373             if (volids[idx]) {
1374                 FSYNC_VolOp(volids[idx],
1375                             partName,
1376                             FSYNC_VOL_FORCE_ERROR,
1377                             FSYNC_WHATEVER,
1378                             NULL);
1379             }
1380         }
1381     }
1382 }
1383
1384 #endif /* AFS_DEMAND_ATTACH_FS */