vol: Use OSI_NULLSOCKET and not -1 to indicate invalid fssync fd
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29
30 #include <sys/types.h>
31 #include <stdio.h>
32 #ifdef AFS_NT40_ENV
33 #include <winsock2.h>
34 #include <time.h>
35 #else
36 #include <sys/param.h>
37 #include <sys/socket.h>
38 #include <netinet/in.h>
39 #include <netdb.h>
40 #include <sys/time.h>
41 #endif
42 #include <errno.h>
43 #include <afs/afs_assert.h>
44 #include <signal.h>
45 #include <string.h>
46
47
48 #include <rx/xdr.h>
49 #include <afs/afsint.h>
50 #include "nfs.h"
51 #include <afs/errors.h>
52 #include "salvsync.h"
53 #include "lwp.h"
54 #include "lock.h"
55 #include <afs/afssyscalls.h>
56 #include "ihandle.h"
57 #include "vnode.h"
58 #include "volume.h"
59 #include "partition.h"
60 #include "common.h"
61 #include <rx/rx_queue.h>
62 #include <afs/procmgmt.h>
63
64 #if !defined(offsetof)
65 #include <stddef.h>
66 #endif
67
68 #ifdef USE_UNIX_SOCKETS
69 #include <afs/afsutil.h>
70 #include <sys/un.h>
71 #endif
72
73 #ifndef WCOREDUMP
74 #define WCOREDUMP(x)    ((x) & 0200)
75 #endif
76
77 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
78                                  * move = dump+restore can run on single server */
79
80
81 /*
82  * This lock controls access to the handler array.
83  */
84 struct Lock SALVSYNC_handler_lock;
85
86
87 #ifdef AFS_DEMAND_ATTACH_FS
88 /*
89  * SALVSYNC is a feature specific to the demand attach fileserver
90  */
91
92 /* Forward declarations */
93 static void * SALVSYNC_syncThread(void *);
94 static void SALVSYNC_newconnection(osi_socket fd);
95 static void SALVSYNC_com(osi_socket fd);
96 static void SALVSYNC_Drop(osi_socket fd);
97 static void AcceptOn(void);
98 static void AcceptOff(void);
99 static void InitHandler(void);
100 static void CallHandler(fd_set * fdsetp);
101 static int AddHandler(osi_socket afd, void (*aproc) (int));
102 static int FindHandler(osi_socket afd);
103 static int FindHandler_r(osi_socket afd);
104 static int RemoveHandler(osi_socket afd);
105 static void GetHandler(fd_set * fdsetp, int *maxfdp);
106
107 static int AllocNode(struct SalvageQueueNode ** node);
108
109 static int AddToSalvageQueue(struct SalvageQueueNode * node);
110 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
111 static void AddToPendingQueue(struct SalvageQueueNode * node);
112 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
113 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
114 static void UpdateCommandPrio(struct SalvageQueueNode * node);
115 static void HandlePrio(struct SalvageQueueNode * clone,
116                        struct SalvageQueueNode * parent,
117                        afs_uint32 new_prio);
118
119 static int LinkNode(struct SalvageQueueNode * parent,
120                     struct SalvageQueueNode * clone);
121
122 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
123                                             struct SalvageQueueNode ** parent);
124 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
125                                                      struct SalvageQueueNode ** parent);
126 static void AddNodeToHash(struct SalvageQueueNode * node);
127
128 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
129 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
130 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
131 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
132 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
133
134
135 extern int LogLevel;
136 extern int VInit;
137 extern pthread_mutex_t vol_salvsync_mutex;
138
139 /**
140  * salvsync server socket handle.
141  */
142 static SYNC_server_state_t salvsync_server_state =
143     { OSI_NULLSOCKET,                       /* file descriptor */
144       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
145       SALVSYNC_PROTO_VERSION,   /* protocol version */
146       5,                        /* bind() retry limit */
147       100,                      /* listen() queue depth */
148       "SALVSYNC",               /* protocol name string */
149     };
150
151
152 /**
153  * queue of all volumes waiting to be salvaged.
154  */
155 struct SalvageQueue {
156     volatile int total_len;
157     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
158     volatile int len[VOLMAXPARTS+1];
159     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
160     pthread_cond_t cv;
161 };
162 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
163
164 /**
165  * queue of all volumes currently being salvaged.
166  */
167 struct QueueHead {
168     volatile struct rx_queue q;  /**< queue of salvages in progress */
169     volatile int len;            /**< length of in-progress queue */
170     pthread_cond_t queue_change_cv;
171 };
172 static struct QueueHead pendingQueue;  /* volumes being salvaged */
173
174 /* XXX
175  * whether a partition has a salvage in progress
176  *
177  * the salvager code only permits one salvage per partition at a time
178  *
179  * the following hack tries to keep salvaged parallelism high by
180  * only permitting one salvage dispatch per partition at a time
181  *
182  * unfortunately, the parallel salvager currently
183  * has a rather braindead routine that won't permit
184  * multiple salvages on the same "device".  this
185  * function happens to break pretty badly on lvm, raid luns, etc.
186  *
187  * this hack isn't good enough to stop the device limiting code from
188  * crippling performance.  someday that code needs to be rewritten
189  */
190 static int partition_salvaging[VOLMAXPARTS+1];
191
192 static int HandlerFD[MAXHANDLERS];
193 static void (*HandlerProc[MAXHANDLERS]) (int);
194
195 #define VSHASH_SIZE 64
196 #define VSHASH_MASK (VSHASH_SIZE-1)
197 #define VSHASH(vid) ((vid)&VSHASH_MASK)
198
199 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
200
201 static struct SalvageQueueNode *
202 LookupNode(afs_uint32 vid, char * partName,
203            struct SalvageQueueNode ** parent)
204 {
205     struct rx_queue *qp, *nqp;
206     struct SalvageQueueNode *vsp;
207     int idx = VSHASH(vid);
208
209     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
210         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
211         if ((vsp->command.sop.volume == vid) &&
212             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
213             break;
214         }
215     }
216
217     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
218         vsp = NULL;
219     }
220
221     if (parent) {
222         if (vsp) {
223             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
224                 vsp->volgroup.parent : vsp;
225         } else {
226             *parent = NULL;
227         }
228     }
229
230     return vsp;
231 }
232
233 static struct SalvageQueueNode *
234 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
235                     struct SalvageQueueNode ** parent)
236 {
237     return LookupNode(qry->volume, qry->partName, parent);
238 }
239
240 static void
241 AddNodeToHash(struct SalvageQueueNode * node)
242 {
243     int idx = VSHASH(node->command.sop.volume);
244
245     if (queue_IsOnQueue(&node->hash_chain)) {
246         return;
247     }
248
249     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
250     SalvageHashTable[idx].len++;
251 }
252
253 #if 0
254 static void
255 DeleteNodeFromHash(struct SalvageQueueNode * node)
256 {
257     int idx = VSHASH(node->command.sop.volume);
258
259     if (queue_IsNotOnQueue(&node->hash_chain)) {
260         return;
261     }
262
263     queue_Remove(&node->hash_chain);
264     SalvageHashTable[idx].len--;
265 }
266 #endif
267
268 void
269 SALVSYNC_salvInit(void)
270 {
271     int i;
272     pthread_t tid;
273     pthread_attr_t tattr;
274
275     /* initialize the queues */
276     Lock_Init(&SALVSYNC_handler_lock);
277     CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
278     for (i = 0; i <= VOLMAXPARTS; i++) {
279         queue_Init(&salvageQueue.part[i]);
280         salvageQueue.len[i] = 0;
281     }
282     CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
283     queue_Init(&pendingQueue);
284     salvageQueue.total_len = pendingQueue.len = 0;
285     salvageQueue.last_insert = -1;
286     memset(partition_salvaging, 0, sizeof(partition_salvaging));
287
288     for (i = 0; i < VSHASH_SIZE; i++) {
289         CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
290         SalvageHashTable[i].len = 0;
291         queue_Init(&SalvageHashTable[i]);
292     }
293
294     /* start the salvsync thread */
295     osi_Assert(pthread_attr_init(&tattr) == 0);
296     osi_Assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
297     osi_Assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
298 }
299
300 static void
301 CleanFDs(void)
302 {
303     int i;
304     for (i = 0; i < MAXHANDLERS; ++i) {
305         if (HandlerFD[i] >= 0) {
306             SALVSYNC_Drop(HandlerFD[i]);
307         }
308     }
309
310     /* just in case we were in AcceptOff mode, and thus this fd wouldn't
311      * have a handler */
312     close(salvsync_server_state.fd);
313     salvsync_server_state.fd = OSI_NULLSOCKET;
314 }
315
316 static fd_set SALVSYNC_readfds;
317
318 static void *
319 SALVSYNC_syncThread(void * args)
320 {
321     int code;
322     SYNC_server_state_t * state = &salvsync_server_state;
323
324     /* when we fork, the child needs to close the salvsync server sockets,
325      * otherwise, it may get salvsync requests, instead of the parent
326      * salvageserver */
327     osi_Assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
328
329     SYNC_getAddr(&state->endpoint, &state->addr);
330     SYNC_cleanupSock(state);
331
332 #ifndef AFS_NT40_ENV
333     (void)signal(SIGPIPE, SIG_IGN);
334 #endif
335
336     state->fd = SYNC_getSock(&state->endpoint);
337     code = SYNC_bindSock(state);
338     osi_Assert(!code);
339
340     InitHandler();
341     AcceptOn();
342
343     for (;;) {
344         int maxfd;
345         GetHandler(&SALVSYNC_readfds, &maxfd);
346         /* Note: check for >= 1 below is essential since IOMGR_select
347          * doesn't have exactly same semantics as select.
348          */
349         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
350             CallHandler(&SALVSYNC_readfds);
351     }
352
353     return NULL;
354 }
355
356 static void
357 SALVSYNC_newconnection(int afd)
358 {
359 #ifdef USE_UNIX_SOCKETS
360     struct sockaddr_un other;
361 #else  /* USE_UNIX_SOCKETS */
362     struct sockaddr_in other;
363 #endif
364     int fd;
365     socklen_t junk;
366
367     junk = sizeof(other);
368     fd = accept(afd, (struct sockaddr *)&other, &junk);
369     if (fd == OSI_NULLSOCKET) {
370         osi_Panic("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
371     } else if (!AddHandler(fd, SALVSYNC_com)) {
372         AcceptOff();
373         osi_Assert(AddHandler(fd, SALVSYNC_com));
374     }
375 }
376
377 /* this function processes commands from an salvsync file descriptor (fd) */
378 static afs_int32 SALV_cnt = 0;
379 static void
380 SALVSYNC_com(osi_socket fd)
381 {
382     SYNC_command com;
383     SYNC_response res;
384     SALVSYNC_response_hdr sres_hdr;
385     SALVSYNC_command scom;
386     SALVSYNC_response sres;
387     SYNC_PROTO_BUF_DECL(buf);
388
389     memset(&com, 0, sizeof(com));
390     memset(&res, 0, sizeof(res));
391     memset(&scom, 0, sizeof(scom));
392     memset(&sres, 0, sizeof(sres));
393     memset(&sres_hdr, 0, sizeof(sres_hdr));
394
395     com.payload.buf = (void *)buf;
396     com.payload.len = SYNC_PROTO_MAX_LEN;
397     res.payload.buf = (void *) &sres_hdr;
398     res.payload.len = sizeof(sres_hdr);
399     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
400     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
401
402     scom.hdr = &com.hdr;
403     scom.sop = (SALVSYNC_command_hdr *) buf;
404     scom.com = &com;
405     sres.hdr = &res.hdr;
406     sres.sop = &sres_hdr;
407     sres.res = &res;
408
409     SALV_cnt++;
410     if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
411         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
412         SALVSYNC_Drop(fd);
413         return;
414     }
415
416     if (com.recv_len < sizeof(com.hdr)) {
417         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
418         res.hdr.response = SYNC_COM_ERROR;
419         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
420         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
421         goto respond;
422     }
423
424     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
425         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
426         res.hdr.response = SYNC_COM_ERROR;
427         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
428         goto respond;
429     }
430
431     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
432         res.hdr.response = SYNC_OK;
433         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
434
435         /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
436          * never wait for a response. */
437         goto done;
438     }
439
440     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
441         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
442         res.hdr.response = SYNC_COM_ERROR;
443         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
444         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
445         goto respond;
446     }
447
448     res.hdr.com_seq = com.hdr.com_seq;
449
450     VOL_LOCK;
451     switch (com.hdr.command) {
452     case SALVSYNC_NOP:
453         break;
454     case SALVSYNC_SALVAGE:
455     case SALVSYNC_RAISEPRIO:
456         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
457         break;
458     case SALVSYNC_CANCEL:
459         /* cancel a salvage */
460         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
461         break;
462     case SALVSYNC_CANCELALL:
463         /* cancel all queued salvages */
464         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
465         break;
466     case SALVSYNC_QUERY:
467         /* query whether a volume is done salvaging */
468         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
469         break;
470     case SALVSYNC_OP_LINK:
471         /* link a clone to its parent in the scheduler */
472         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
473         break;
474     default:
475         res.hdr.response = SYNC_BAD_COMMAND;
476         break;
477     }
478
479     sres_hdr.sq_len = salvageQueue.total_len;
480     sres_hdr.pq_len = pendingQueue.len;
481     VOL_UNLOCK;
482
483  respond:
484     SYNC_putRes(&salvsync_server_state, fd, &res);
485
486  done:
487     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
488         SALVSYNC_Drop(fd);
489     }
490 }
491
492 /**
493  * request that a volume be salvaged.
494  *
495  * @param[in]  com  inbound command object
496  * @param[out] res  outbound response object
497  *
498  * @return operation status
499  *    @retval SYNC_OK success
500  *    @retval SYNC_DENIED failed to enqueue request
501  *    @retval SYNC_FAILED malformed command packet
502  *
503  * @note this is a SALVSYNC protocol rpc handler
504  *
505  * @internal
506  *
507  * @post the volume is enqueued in the to-be-salvaged queue.
508  *       if the volume was already in the salvage queue, its
509  *       priority (and thus its location in the queue) are
510  *       updated.
511  */
512 static afs_int32
513 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
514 {
515     afs_int32 code = SYNC_OK;
516     struct SalvageQueueNode * node, * clone;
517     int hash = 0;
518
519     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
520         code = SYNC_FAILED;
521         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
522         goto done;
523     }
524
525     clone = LookupNodeByCommand(com->sop, &node);
526
527     if (node == NULL) {
528         if (AllocNode(&node)) {
529             code = SYNC_DENIED;
530             res->hdr->reason = SYNC_REASON_NOMEM;
531             goto done;
532         }
533         clone = node;
534         hash = 1;
535     }
536
537     HandlePrio(clone, node, com->sop->prio);
538
539     switch (node->state) {
540     case SALVSYNC_STATE_QUEUED:
541         UpdateCommandPrio(node);
542         break;
543
544     case SALVSYNC_STATE_ERROR:
545     case SALVSYNC_STATE_DONE:
546     case SALVSYNC_STATE_UNKNOWN:
547         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
548         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
549
550         /*
551          * make sure volgroup parent partition path is kept coherent
552          *
553          * If we ever want to support non-COW clones on a machine holding
554          * the RW site, please note that this code does not work under the
555          * conditions where someone zaps a COW clone on partition X, and
556          * subsequently creates a full clone on partition Y -- we'd need
557          * an inverse to SALVSYNC_com_Link.
558          *  -- tkeiser 11/28/2007
559          */
560         strcpy(node->command.sop.partName, com->sop->partName);
561
562         if (AddToSalvageQueue(node)) {
563             code = SYNC_DENIED;
564         }
565         break;
566
567     default:
568         break;
569     }
570
571     if (hash) {
572         AddNodeToHash(node);
573     }
574
575     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
576     res->sop->state = node->state;
577     res->sop->prio = node->command.sop.prio;
578
579  done:
580     return code;
581 }
582
583 /**
584  * cancel a pending salvage request.
585  *
586  * @param[in]  com  inbound command object
587  * @param[out] res  outbound response object
588  *
589  * @return operation status
590  *    @retval SYNC_OK success
591  *    @retval SYNC_FAILED malformed command packet
592  *
593  * @note this is a SALVSYNC protocol rpc handler
594  *
595  * @internal
596  */
597 static afs_int32
598 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
599 {
600     afs_int32 code = SYNC_OK;
601     struct SalvageQueueNode * node;
602
603     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
604         code = SYNC_FAILED;
605         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
606         goto done;
607     }
608
609     node = LookupNodeByCommand(com->sop, NULL);
610
611     if (node == NULL) {
612         res->sop->state = SALVSYNC_STATE_UNKNOWN;
613         res->sop->prio = 0;
614     } else {
615         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
616         res->sop->prio = node->command.sop.prio;
617         res->sop->state = node->state;
618         if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
619             (node->state == SALVSYNC_STATE_QUEUED)) {
620             DeleteFromSalvageQueue(node);
621         }
622     }
623
624  done:
625     return code;
626 }
627
628 /**
629  * cancel all pending salvage requests.
630  *
631  * @param[in]  com  incoming command object
632  * @param[out] res  outbound response object
633  *
634  * @return operation status
635  *    @retval SYNC_OK success
636  *
637  * @note this is a SALVSYNC protocol rpc handler
638  *
639  * @internal
640  */
641 static afs_int32
642 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
643 {
644     struct SalvageQueueNode * np, *nnp;
645     struct DiskPartition64 * dp;
646
647     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
648         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
649             DeleteFromSalvageQueue(np);
650         }
651     }
652
653     return SYNC_OK;
654 }
655
656 /**
657  * link a queue node for a clone to its parent volume.
658  *
659  * @param[in]  com   inbound command object
660  * @param[out] res   outbound response object
661  *
662  * @return operation status
663  *    @retval SYNC_OK success
664  *    @retval SYNC_FAILED malformed command packet
665  *    @retval SYNC_DENIED the request could not be completed
666  *
667  * @note this is a SALVSYNC protocol rpc handler
668  *
669  * @post the requested volume is marked as a child of another volume.
670  *       thus, future salvage requests for this volume will result in the
671  *       parent of the volume group being scheduled for salvage instead
672  *       of this clone.
673  *
674  * @internal
675  */
676 static afs_int32
677 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
678 {
679     afs_int32 code = SYNC_OK;
680     struct SalvageQueueNode * clone, * parent;
681
682     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
683         code = SYNC_FAILED;
684         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
685         goto done;
686     }
687
688     /* lookup clone's salvage scheduling node */
689     clone = LookupNodeByCommand(com->sop, NULL);
690     if (clone == NULL) {
691         code = SYNC_DENIED;
692         res->hdr->reason = SALVSYNC_REASON_ERROR;
693         goto done;
694     }
695
696     /* lookup parent's salvage scheduling node */
697     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
698     if (parent == NULL) {
699         if (AllocNode(&parent)) {
700             code = SYNC_DENIED;
701             res->hdr->reason = SYNC_REASON_NOMEM;
702             goto done;
703         }
704         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
705         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
706         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
707         AddNodeToHash(parent);
708     }
709
710     if (LinkNode(parent, clone)) {
711         code = SYNC_DENIED;
712         goto done;
713     }
714
715  done:
716     return code;
717 }
718
719 /**
720  * query the status of a volume salvage request.
721  *
722  * @param[in]  com   inbound command object
723  * @param[out] res   outbound response object
724  *
725  * @return operation status
726  *    @retval SYNC_OK success
727  *    @retval SYNC_FAILED malformed command packet
728  *
729  * @note this is a SALVSYNC protocol rpc handler
730  *
731  * @internal
732  */
733 static afs_int32
734 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
735 {
736     afs_int32 code = SYNC_OK;
737     struct SalvageQueueNode * node;
738
739     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
740         code = SYNC_FAILED;
741         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
742         goto done;
743     }
744
745     LookupNodeByCommand(com->sop, &node);
746
747     /* query whether a volume is done salvaging */
748     if (node == NULL) {
749         res->sop->state = SALVSYNC_STATE_UNKNOWN;
750         res->sop->prio = 0;
751     } else {
752         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
753         res->sop->state = node->state;
754         res->sop->prio = node->command.sop.prio;
755     }
756
757  done:
758     return code;
759 }
760
761 static void
762 SALVSYNC_Drop(osi_socket fd)
763 {
764     RemoveHandler(fd);
765 #ifdef AFS_NT40_ENV
766     closesocket(fd);
767 #else
768     close(fd);
769 #endif
770     AcceptOn();
771 }
772
773 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
774
775 static void
776 AcceptOn(void)
777 {
778     if (AcceptHandler == -1) {
779         osi_Assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
780         AcceptHandler = FindHandler(salvsync_server_state.fd);
781     }
782 }
783
784 static void
785 AcceptOff(void)
786 {
787     if (AcceptHandler != -1) {
788         osi_Assert(RemoveHandler(salvsync_server_state.fd));
789         AcceptHandler = -1;
790     }
791 }
792
793 /* The multiple FD handling code. */
794
795 static void
796 InitHandler(void)
797 {
798     int i;
799     ObtainWriteLock(&SALVSYNC_handler_lock);
800     for (i = 0; i < MAXHANDLERS; i++) {
801         HandlerFD[i] = OSI_NULLSOCKET;
802         HandlerProc[i] = NULL;
803     }
804     ReleaseWriteLock(&SALVSYNC_handler_lock);
805 }
806
807 static void
808 CallHandler(fd_set * fdsetp)
809 {
810     int i;
811     ObtainReadLock(&SALVSYNC_handler_lock);
812     for (i = 0; i < MAXHANDLERS; i++) {
813         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
814             ReleaseReadLock(&SALVSYNC_handler_lock);
815             (*HandlerProc[i]) (HandlerFD[i]);
816             ObtainReadLock(&SALVSYNC_handler_lock);
817         }
818     }
819     ReleaseReadLock(&SALVSYNC_handler_lock);
820 }
821
822 static int
823 AddHandler(osi_socket afd, void (*aproc) (int))
824 {
825     int i;
826     ObtainWriteLock(&SALVSYNC_handler_lock);
827     for (i = 0; i < MAXHANDLERS; i++)
828         if (HandlerFD[i] == OSI_NULLSOCKET)
829             break;
830     if (i >= MAXHANDLERS) {
831         ReleaseWriteLock(&SALVSYNC_handler_lock);
832         return 0;
833     }
834     HandlerFD[i] = afd;
835     HandlerProc[i] = aproc;
836     ReleaseWriteLock(&SALVSYNC_handler_lock);
837     return 1;
838 }
839
840 static int
841 FindHandler(osi_socket afd)
842 {
843     int i;
844     ObtainReadLock(&SALVSYNC_handler_lock);
845     for (i = 0; i < MAXHANDLERS; i++)
846         if (HandlerFD[i] == afd) {
847             ReleaseReadLock(&SALVSYNC_handler_lock);
848             return i;
849         }
850     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
851     osi_Panic("Failed to find handler\n");
852     return -1;                  /* satisfy compiler */
853 }
854
855 static int
856 FindHandler_r(osi_socket afd)
857 {
858     int i;
859     for (i = 0; i < MAXHANDLERS; i++)
860         if (HandlerFD[i] == afd) {
861             return i;
862         }
863     osi_Panic("Failed to find handler\n");
864     return -1;                  /* satisfy compiler */
865 }
866
867 static int
868 RemoveHandler(osi_socket afd)
869 {
870     ObtainWriteLock(&SALVSYNC_handler_lock);
871     HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
872     ReleaseWriteLock(&SALVSYNC_handler_lock);
873     return 1;
874 }
875
876 static void
877 GetHandler(fd_set * fdsetp, int *maxfdp)
878 {
879     int i;
880     int maxfd = -1;
881     FD_ZERO(fdsetp);
882     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
883     for (i = 0; i < MAXHANDLERS; i++)
884         if (HandlerFD[i] != OSI_NULLSOCKET) {
885             FD_SET(HandlerFD[i], fdsetp);
886 #ifndef AFS_NT40_ENV
887             /* On Windows the nfds parameter to select() is ignored */
888             if (maxfd < HandlerFD[i] || maxfd == (int)-1)
889                 maxfd = HandlerFD[i];
890 #endif
891         }
892     *maxfdp = maxfd;
893     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
894 }
895
896 /**
897  * allocate a salvage queue node.
898  *
899  * @param[out] node_out  address in which to store new node pointer
900  *
901  * @return operation status
902  *    @retval 0 success
903  *    @retval 1 failed to allocate node
904  *
905  * @internal
906  */
907 static int
908 AllocNode(struct SalvageQueueNode ** node_out)
909 {
910     int code = 0;
911     struct SalvageQueueNode * node;
912
913     *node_out = node = (struct SalvageQueueNode *)
914         malloc(sizeof(struct SalvageQueueNode));
915     if (node == NULL) {
916         code = 1;
917         goto done;
918     }
919
920     memset(node, 0, sizeof(struct SalvageQueueNode));
921     node->type = SALVSYNC_VOLGROUP_PARENT;
922     node->state = SALVSYNC_STATE_UNKNOWN;
923
924  done:
925     return code;
926 }
927
928 /**
929  * link a salvage queue node to its parent.
930  *
931  * @param[in] parent  pointer to queue node for parent of volume group
932  * @param[in] clone   pointer to queue node for a clone
933  *
934  * @return operation status
935  *    @retval 0 success
936  *    @retval 1 failure
937  *
938  * @internal
939  */
940 static int
941 LinkNode(struct SalvageQueueNode * parent,
942          struct SalvageQueueNode * clone)
943 {
944     int code = 0;
945     int idx;
946
947     /* check for attaching a clone to a clone */
948     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
949         code = 1;
950         goto done;
951     }
952
953     /* check for pre-existing registration and openings */
954     for (idx = 0; idx < VOLMAXTYPES; idx++) {
955         if (parent->volgroup.children[idx] == clone) {
956             goto linked;
957         }
958         if (parent->volgroup.children[idx] == NULL) {
959             break;
960         }
961     }
962     if (idx == VOLMAXTYPES) {
963         code = 1;
964         goto done;
965     }
966
967     /* link parent and child */
968     parent->volgroup.children[idx] = clone;
969     clone->type = SALVSYNC_VOLGROUP_CLONE;
970     clone->volgroup.parent = parent;
971
972
973  linked:
974     switch (clone->state) {
975     case SALVSYNC_STATE_QUEUED:
976         DeleteFromSalvageQueue(clone);
977
978     case SALVSYNC_STATE_SALVAGING:
979         switch (parent->state) {
980         case SALVSYNC_STATE_UNKNOWN:
981         case SALVSYNC_STATE_ERROR:
982         case SALVSYNC_STATE_DONE:
983             parent->command.sop.prio = clone->command.sop.prio;
984             AddToSalvageQueue(parent);
985             break;
986
987         case SALVSYNC_STATE_QUEUED:
988             if (clone->command.sop.prio) {
989                 parent->command.sop.prio += clone->command.sop.prio;
990                 UpdateCommandPrio(parent);
991             }
992             break;
993
994         default:
995             break;
996         }
997         break;
998
999     default:
1000         break;
1001     }
1002
1003  done:
1004     return code;
1005 }
1006
1007 static void
1008 HandlePrio(struct SalvageQueueNode * clone,
1009            struct SalvageQueueNode * node,
1010            afs_uint32 new_prio)
1011 {
1012     afs_uint32 delta;
1013
1014     switch (node->state) {
1015     case SALVSYNC_STATE_ERROR:
1016     case SALVSYNC_STATE_DONE:
1017     case SALVSYNC_STATE_UNKNOWN:
1018         node->command.sop.prio = 0;
1019         break;
1020     default:
1021         break;
1022     }
1023
1024     if (new_prio < clone->command.sop.prio) {
1025         /* strange. let's just set our delta to 1 */
1026         delta = 1;
1027     } else {
1028         delta = new_prio - clone->command.sop.prio;
1029     }
1030
1031     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1032         clone->command.sop.prio = new_prio;
1033     }
1034
1035     node->command.sop.prio += delta;
1036 }
1037
1038 static int
1039 AddToSalvageQueue(struct SalvageQueueNode * node)
1040 {
1041     afs_int32 id;
1042     struct SalvageQueueNode * last = NULL;
1043
1044     id = volutil_GetPartitionID(node->command.sop.partName);
1045     if (id < 0 || id > VOLMAXPARTS) {
1046         return 1;
1047     }
1048     if (!VGetPartitionById_r(id, 0)) {
1049         /* don't enqueue salvage requests for unmounted partitions */
1050         return 1;
1051     }
1052     if (queue_IsOnQueue(node)) {
1053         return 0;
1054     }
1055
1056     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1057         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1058     }
1059     queue_Append(&salvageQueue.part[id], node);
1060     salvageQueue.len[id]++;
1061     salvageQueue.total_len++;
1062     salvageQueue.last_insert = id;
1063     node->partition_id = id;
1064     node->state = SALVSYNC_STATE_QUEUED;
1065
1066     /* reorder, if necessary */
1067     if (last && last->command.sop.prio < node->command.sop.prio) {
1068         UpdateCommandPrio(node);
1069     }
1070
1071     CV_BROADCAST(&salvageQueue.cv);
1072     return 0;
1073 }
1074
1075 static void
1076 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1077 {
1078     if (queue_IsOnQueue(node)) {
1079         queue_Remove(node);
1080         salvageQueue.len[node->partition_id]--;
1081         salvageQueue.total_len--;
1082         node->state = SALVSYNC_STATE_UNKNOWN;
1083         CV_BROADCAST(&salvageQueue.cv);
1084     }
1085 }
1086
1087 static void
1088 AddToPendingQueue(struct SalvageQueueNode * node)
1089 {
1090     queue_Append(&pendingQueue, node);
1091     pendingQueue.len++;
1092     node->state = SALVSYNC_STATE_SALVAGING;
1093     CV_BROADCAST(&pendingQueue.queue_change_cv);
1094 }
1095
1096 static void
1097 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1098 {
1099     if (queue_IsOnQueue(node)) {
1100         queue_Remove(node);
1101         pendingQueue.len--;
1102         node->state = SALVSYNC_STATE_UNKNOWN;
1103         CV_BROADCAST(&pendingQueue.queue_change_cv);
1104     }
1105 }
1106
1107 #if 0
1108 static struct SalvageQueueNode *
1109 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1110 {
1111     struct SalvageQueueNode * np, * nnp;
1112
1113     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1114         if ((np->command.sop.volume == qry->volume) &&
1115             !strncmp(np->command.sop.partName, qry->partName,
1116                      sizeof(qry->partName)))
1117             break;
1118     }
1119
1120     if (queue_IsEnd(&pendingQueue, np))
1121         np = NULL;
1122     return np;
1123 }
1124 #endif
1125
1126 static struct SalvageQueueNode *
1127 LookupPendingCommandByPid(int pid)
1128 {
1129     struct SalvageQueueNode * np, * nnp;
1130
1131     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1132         if (np->pid == pid)
1133             break;
1134     }
1135
1136     if (queue_IsEnd(&pendingQueue, np))
1137         np = NULL;
1138     return np;
1139 }
1140
1141
1142 /* raise the priority of a previously scheduled salvage */
1143 static void
1144 UpdateCommandPrio(struct SalvageQueueNode * node)
1145 {
1146     struct SalvageQueueNode *np, *nnp;
1147     afs_int32 id;
1148     afs_uint32 prio;
1149
1150     osi_Assert(queue_IsOnQueue(node));
1151
1152     prio = node->command.sop.prio;
1153     id = node->partition_id;
1154     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1155         queue_Remove(node);
1156         queue_Prepend(&salvageQueue.part[id], node);
1157     } else {
1158         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1159             if (np->command.sop.prio > prio)
1160                 break;
1161         }
1162         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1163             queue_Remove(node);
1164             queue_Prepend(&salvageQueue.part[id], node);
1165         } else if (node != np) {
1166             queue_Remove(node);
1167             queue_InsertAfter(np, node);
1168         }
1169     }
1170 }
1171
1172 /* this will need to be rearchitected if we ever want more than one thread
1173  * to wait for new salvage nodes */
1174 struct SalvageQueueNode *
1175 SALVSYNC_getWork(void)
1176 {
1177     int i;
1178     struct DiskPartition64 * dp = NULL, * fdp;
1179     static afs_int32 next_part_sched = 0;
1180     struct SalvageQueueNode *node = NULL;
1181
1182     VOL_LOCK;
1183
1184     /*
1185      * wait for work to be scheduled
1186      * if there are no disk partitions, just sit in this wait loop forever
1187      */
1188     while (!salvageQueue.total_len || !DiskPartitionList) {
1189         VOL_CV_WAIT(&salvageQueue.cv);
1190     }
1191
1192     /*
1193      * short circuit for simple case where only one partition has
1194      * scheduled salvages
1195      */
1196     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1197         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1198         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1199         goto have_node;
1200     }
1201
1202
1203     /*
1204      * ok, more than one partition has scheduled salvages.
1205      * now search for partitions with scheduled salvages, but no pending salvages.
1206      */
1207     dp = VGetPartitionById_r(next_part_sched, 0);
1208     if (!dp) {
1209         dp = DiskPartitionList;
1210     }
1211     fdp = dp;
1212
1213     for (i=0 ;
1214          !i || dp != fdp ;
1215          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1216         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1217             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1218             goto have_node;
1219         }
1220     }
1221
1222
1223     /*
1224      * all partitions with scheduled salvages have at least one pending.
1225      * now do an exhaustive search for a scheduled salvage.
1226      */
1227     dp = fdp;
1228
1229     for (i=0 ;
1230          !i || dp != fdp ;
1231          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1232         if (salvageQueue.len[dp->index]) {
1233             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1234             goto have_node;
1235         }
1236     }
1237
1238     /* we should never reach this line */
1239     osi_Panic("Node not found\n");
1240
1241  have_node:
1242     osi_Assert(node != NULL);
1243     node->pid = 0;
1244     partition_salvaging[node->partition_id]++;
1245     DeleteFromSalvageQueue(node);
1246     AddToPendingQueue(node);
1247
1248     if (dp) {
1249         /* update next_part_sched field */
1250         if (dp->next) {
1251             next_part_sched = dp->next->index;
1252         } else if (DiskPartitionList) {
1253             next_part_sched = DiskPartitionList->index;
1254         } else {
1255             next_part_sched = -1;
1256         }
1257     }
1258
1259     VOL_UNLOCK;
1260     return node;
1261 }
1262
1263 /**
1264  * update internal scheduler state to reflect completion of a work unit.
1265  *
1266  * @param[in]  node    salvage queue node object pointer
1267  * @param[in]  result  worker process result code
1268  *
1269  * @post scheduler state is updated.
1270  *
1271  * @internal
1272  */
1273 static void
1274 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1275 {
1276     afs_int32 partid;
1277     int idx;
1278
1279     DeleteFromPendingQueue(node);
1280     partid = node->partition_id;
1281     if (partid >=0 && partid <= VOLMAXPARTS) {
1282         partition_salvaging[partid]--;
1283     }
1284     if (result == 0) {
1285         node->state = SALVSYNC_STATE_DONE;
1286     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1287         node->state = SALVSYNC_STATE_ERROR;
1288     }
1289
1290     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1291         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1292             if (node->volgroup.children[idx]) {
1293                 node->volgroup.children[idx]->state = node->state;
1294             }
1295         }
1296     }
1297 }
1298
1299 /**
1300  * check whether worker child failed.
1301  *
1302  * @param[in] status  status bitfield return by wait()
1303  *
1304  * @return boolean failure code
1305  *    @retval 0 child succeeded
1306  *    @retval 1 child failed
1307  *
1308  * @internal
1309  */
1310 static int
1311 ChildFailed(int status)
1312 {
1313     return (WCOREDUMP(status) ||
1314             WIFSIGNALED(status) ||
1315             ((WEXITSTATUS(status) != 0) &&
1316              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1317 }
1318
1319
1320 /**
1321  * notify salvsync scheduler of node completion, by child pid.
1322  *
1323  * @param[in]  pid     pid of worker child
1324  * @param[in]  status  worker status bitfield from wait()
1325  *
1326  * @post scheduler state is updated.
1327  *       if status code is a failure, fileserver notification was attempted
1328  *
1329  * @see SALVSYNC_doneWork_r
1330  */
1331 void
1332 SALVSYNC_doneWorkByPid(int pid, int status)
1333 {
1334     struct SalvageQueueNode * node;
1335     char partName[16];
1336     afs_uint32 volids[VOLMAXTYPES+1];
1337     unsigned int idx;
1338
1339     memset(volids, 0, sizeof(volids));
1340
1341     VOL_LOCK;
1342     node = LookupPendingCommandByPid(pid);
1343     if (node != NULL) {
1344         SALVSYNC_doneWork_r(node, status);
1345
1346         if (ChildFailed(status)) {
1347             /* populate volume id list for later processing outside the glock */
1348             volids[0] = node->command.sop.volume;
1349             strcpy(partName, node->command.sop.partName);
1350             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1351                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1352                     if (node->volgroup.children[idx]) {
1353                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1354                     }
1355                 }
1356             }
1357         }
1358     }
1359     VOL_UNLOCK;
1360
1361     /*
1362      * if necessary, notify fileserver of
1363      * failure to salvage volume group
1364      * [we cannot guarantee that the child made the
1365      *  appropriate notifications (e.g. SIGSEGV)]
1366      *  -- tkeiser 11/28/2007
1367      */
1368     if (ChildFailed(status)) {
1369         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1370             if (volids[idx]) {
1371                 FSYNC_VolOp(volids[idx],
1372                             partName,
1373                             FSYNC_VOL_FORCE_ERROR,
1374                             FSYNC_WHATEVER,
1375                             NULL);
1376             }
1377         }
1378     }
1379 }
1380
1381 #endif /* AFS_DEMAND_ATTACH_FS */