Further rationalise our usage of assert()
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29 #include <afs/procmgmt.h>
30 #include <roken.h>
31
32 #include <stddef.h>
33
34 #include <afs/opr.h>
35 #include <afs/afsint.h>
36 #include "nfs.h"
37 #include <afs/errors.h>
38 #include "salvsync.h"
39 #include "lwp.h"
40 #include "lock.h"
41 #include <afs/afssyscalls.h>
42 #include "ihandle.h"
43 #include "vnode.h"
44 #include "volume.h"
45 #include "partition.h"
46 #include "common.h"
47 #include <rx/rx_queue.h>
48
49 #ifdef USE_UNIX_SOCKETS
50 #include <afs/afsutil.h>
51 #include <sys/un.h>
52 #endif
53
54 #ifndef WCOREDUMP
55 #define WCOREDUMP(x)    ((x) & 0200)
56 #endif
57
58 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
59                                  * move = dump+restore can run on single server */
60
61
62 /*
63  * This lock controls access to the handler array.
64  */
65 struct Lock SALVSYNC_handler_lock;
66
67
68 #ifdef AFS_DEMAND_ATTACH_FS
69 /*
70  * SALVSYNC is a feature specific to the demand attach fileserver
71  */
72
73 /* Forward declarations */
74 static void * SALVSYNC_syncThread(void *);
75 static void SALVSYNC_newconnection(osi_socket fd);
76 static void SALVSYNC_com(osi_socket fd);
77 static void SALVSYNC_Drop(osi_socket fd);
78 static void AcceptOn(void);
79 static void AcceptOff(void);
80 static void InitHandler(void);
81 static void CallHandler(fd_set * fdsetp);
82 static int AddHandler(osi_socket afd, void (*aproc) (int));
83 static int FindHandler(osi_socket afd);
84 static int FindHandler_r(osi_socket afd);
85 static int RemoveHandler(osi_socket afd);
86 static void GetHandler(fd_set * fdsetp, int *maxfdp);
87
88 static int AllocNode(struct SalvageQueueNode ** node);
89
90 static int AddToSalvageQueue(struct SalvageQueueNode * node);
91 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
92 static void AddToPendingQueue(struct SalvageQueueNode * node);
93 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
94 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
95 static void UpdateCommandPrio(struct SalvageQueueNode * node);
96 static void HandlePrio(struct SalvageQueueNode * clone,
97                        struct SalvageQueueNode * parent,
98                        afs_uint32 new_prio);
99
100 static int LinkNode(struct SalvageQueueNode * parent,
101                     struct SalvageQueueNode * clone);
102
103 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
104                                             struct SalvageQueueNode ** parent);
105 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
106                                                      struct SalvageQueueNode ** parent);
107 static void AddNodeToHash(struct SalvageQueueNode * node);
108
109 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
110 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
111 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
112 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
113 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
114
115
116 extern int LogLevel;
117 extern int VInit;
118 extern pthread_mutex_t vol_salvsync_mutex;
119
120 /**
121  * salvsync server socket handle.
122  */
123 static SYNC_server_state_t salvsync_server_state =
124     { OSI_NULLSOCKET,                       /* file descriptor */
125       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
126       SALVSYNC_PROTO_VERSION,   /* protocol version */
127       5,                        /* bind() retry limit */
128       100,                      /* listen() queue depth */
129       "SALVSYNC",               /* protocol name string */
130     };
131
132
133 /**
134  * queue of all volumes waiting to be salvaged.
135  */
136 struct SalvageQueue {
137     volatile int total_len;
138     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
139     volatile int len[VOLMAXPARTS+1];
140     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
141     pthread_cond_t cv;
142 };
143 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
144
145 /**
146  * queue of all volumes currently being salvaged.
147  */
148 struct QueueHead {
149     volatile struct rx_queue q;  /**< queue of salvages in progress */
150     volatile int len;            /**< length of in-progress queue */
151     pthread_cond_t queue_change_cv;
152 };
153 static struct QueueHead pendingQueue;  /* volumes being salvaged */
154
155 /* XXX
156  * whether a partition has a salvage in progress
157  *
158  * the salvager code only permits one salvage per partition at a time
159  *
160  * the following hack tries to keep salvaged parallelism high by
161  * only permitting one salvage dispatch per partition at a time
162  *
163  * unfortunately, the parallel salvager currently
164  * has a rather braindead routine that won't permit
165  * multiple salvages on the same "device".  this
166  * function happens to break pretty badly on lvm, raid luns, etc.
167  *
168  * this hack isn't good enough to stop the device limiting code from
169  * crippling performance.  someday that code needs to be rewritten
170  */
171 static int partition_salvaging[VOLMAXPARTS+1];
172
173 static int HandlerFD[MAXHANDLERS];
174 static void (*HandlerProc[MAXHANDLERS]) (int);
175
176 #define VSHASH_SIZE 64
177 #define VSHASH_MASK (VSHASH_SIZE-1)
178 #define VSHASH(vid) ((vid)&VSHASH_MASK)
179
180 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
181
182 static struct SalvageQueueNode *
183 LookupNode(afs_uint32 vid, char * partName,
184            struct SalvageQueueNode ** parent)
185 {
186     struct rx_queue *qp, *nqp;
187     struct SalvageQueueNode *vsp = NULL;
188     int idx = VSHASH(vid);
189
190     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
191         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
192         if ((vsp->command.sop.volume == vid) &&
193             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
194             break;
195         }
196     }
197
198     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
199         vsp = NULL;
200     }
201
202     if (parent) {
203         if (vsp) {
204             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
205                 vsp->volgroup.parent : vsp;
206         } else {
207             *parent = NULL;
208         }
209     }
210
211     return vsp;
212 }
213
214 static struct SalvageQueueNode *
215 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
216                     struct SalvageQueueNode ** parent)
217 {
218     return LookupNode(qry->volume, qry->partName, parent);
219 }
220
221 static void
222 AddNodeToHash(struct SalvageQueueNode * node)
223 {
224     int idx = VSHASH(node->command.sop.volume);
225
226     if (queue_IsOnQueue(&node->hash_chain)) {
227         return;
228     }
229
230     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
231     SalvageHashTable[idx].len++;
232 }
233
234 #if 0
235 static void
236 DeleteNodeFromHash(struct SalvageQueueNode * node)
237 {
238     int idx = VSHASH(node->command.sop.volume);
239
240     if (queue_IsNotOnQueue(&node->hash_chain)) {
241         return;
242     }
243
244     queue_Remove(&node->hash_chain);
245     SalvageHashTable[idx].len--;
246 }
247 #endif
248
249 void
250 SALVSYNC_salvInit(void)
251 {
252     int i;
253     pthread_t tid;
254     pthread_attr_t tattr;
255
256     /* initialize the queues */
257     Lock_Init(&SALVSYNC_handler_lock);
258     CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
259     for (i = 0; i <= VOLMAXPARTS; i++) {
260         queue_Init(&salvageQueue.part[i]);
261         salvageQueue.len[i] = 0;
262     }
263     CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
264     queue_Init(&pendingQueue);
265     salvageQueue.total_len = pendingQueue.len = 0;
266     salvageQueue.last_insert = -1;
267     memset(partition_salvaging, 0, sizeof(partition_salvaging));
268
269     for (i = 0; i < VSHASH_SIZE; i++) {
270         CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
271         SalvageHashTable[i].len = 0;
272         queue_Init(&SalvageHashTable[i]);
273     }
274
275     /* start the salvsync thread */
276     opr_Verify(pthread_attr_init(&tattr) == 0);
277     opr_Verify(pthread_attr_setdetachstate(&tattr,
278                                            PTHREAD_CREATE_DETACHED) == 0);
279     opr_Verify(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
280 }
281
282 static void
283 CleanFDs(void)
284 {
285     int i;
286     for (i = 0; i < MAXHANDLERS; ++i) {
287         if (HandlerFD[i] >= 0) {
288             SALVSYNC_Drop(HandlerFD[i]);
289         }
290     }
291
292     /* just in case we were in AcceptOff mode, and thus this fd wouldn't
293      * have a handler */
294     close(salvsync_server_state.fd);
295     salvsync_server_state.fd = OSI_NULLSOCKET;
296 }
297
298 static fd_set SALVSYNC_readfds;
299
300 static void *
301 SALVSYNC_syncThread(void * args)
302 {
303     int code;
304     SYNC_server_state_t * state = &salvsync_server_state;
305
306     /* when we fork, the child needs to close the salvsync server sockets,
307      * otherwise, it may get salvsync requests, instead of the parent
308      * salvageserver */
309     opr_Verify(pthread_atfork(NULL, NULL, CleanFDs) == 0);
310
311     SYNC_getAddr(&state->endpoint, &state->addr);
312     SYNC_cleanupSock(state);
313
314 #ifndef AFS_NT40_ENV
315     (void)signal(SIGPIPE, SIG_IGN);
316 #endif
317
318     state->fd = SYNC_getSock(&state->endpoint);
319     code = SYNC_bindSock(state);
320     opr_Assert(!code);
321
322     InitHandler();
323     AcceptOn();
324
325     for (;;) {
326         int maxfd;
327         struct timeval s_timeout;
328         GetHandler(&SALVSYNC_readfds, &maxfd);
329         s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
330         s_timeout.tv_usec = 0;
331         /* Note: check for >= 1 below is essential since IOMGR_select
332          * doesn't have exactly same semantics as select.
333          */
334         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
335             CallHandler(&SALVSYNC_readfds);
336     }
337
338     return NULL;
339 }
340
341 static void
342 SALVSYNC_newconnection(int afd)
343 {
344 #ifdef USE_UNIX_SOCKETS
345     struct sockaddr_un other;
346 #else  /* USE_UNIX_SOCKETS */
347     struct sockaddr_in other;
348 #endif
349     int fd;
350     socklen_t junk;
351
352     junk = sizeof(other);
353     fd = accept(afd, (struct sockaddr *)&other, &junk);
354     if (fd == OSI_NULLSOCKET) {
355         osi_Panic("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
356     } else if (!AddHandler(fd, SALVSYNC_com)) {
357         AcceptOff();
358         opr_Verify(AddHandler(fd, SALVSYNC_com));
359     }
360 }
361
362 /* this function processes commands from an salvsync file descriptor (fd) */
363 static afs_int32 SALV_cnt = 0;
364 static void
365 SALVSYNC_com(osi_socket fd)
366 {
367     SYNC_command com;
368     SYNC_response res;
369     SALVSYNC_response_hdr sres_hdr;
370     SALVSYNC_command scom;
371     SALVSYNC_response sres;
372     SYNC_PROTO_BUF_DECL(buf);
373
374     memset(&com, 0, sizeof(com));
375     memset(&res, 0, sizeof(res));
376     memset(&scom, 0, sizeof(scom));
377     memset(&sres, 0, sizeof(sres));
378     memset(&sres_hdr, 0, sizeof(sres_hdr));
379
380     com.payload.buf = (void *)buf;
381     com.payload.len = SYNC_PROTO_MAX_LEN;
382     res.payload.buf = (void *) &sres_hdr;
383     res.payload.len = sizeof(sres_hdr);
384     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
385     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
386
387     scom.hdr = &com.hdr;
388     scom.sop = (SALVSYNC_command_hdr *) buf;
389     scom.com = &com;
390     sres.hdr = &res.hdr;
391     sres.sop = &sres_hdr;
392     sres.res = &res;
393
394     SALV_cnt++;
395     if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
396         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
397         SALVSYNC_Drop(fd);
398         return;
399     }
400
401     if (com.recv_len < sizeof(com.hdr)) {
402         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
403         res.hdr.response = SYNC_COM_ERROR;
404         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
405         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
406         goto respond;
407     }
408
409     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
410         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
411         res.hdr.response = SYNC_COM_ERROR;
412         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
413         goto respond;
414     }
415
416     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
417         res.hdr.response = SYNC_OK;
418         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
419
420         /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
421          * never wait for a response. */
422         goto done;
423     }
424
425     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
426         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
427         res.hdr.response = SYNC_COM_ERROR;
428         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
429         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
430         goto respond;
431     }
432
433     res.hdr.com_seq = com.hdr.com_seq;
434
435     VOL_LOCK;
436     switch (com.hdr.command) {
437     case SALVSYNC_NOP:
438         break;
439     case SALVSYNC_SALVAGE:
440     case SALVSYNC_RAISEPRIO:
441         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
442         break;
443     case SALVSYNC_CANCEL:
444         /* cancel a salvage */
445         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
446         break;
447     case SALVSYNC_CANCELALL:
448         /* cancel all queued salvages */
449         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
450         break;
451     case SALVSYNC_QUERY:
452         /* query whether a volume is done salvaging */
453         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
454         break;
455     case SALVSYNC_OP_LINK:
456         /* link a clone to its parent in the scheduler */
457         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
458         break;
459     default:
460         res.hdr.response = SYNC_BAD_COMMAND;
461         break;
462     }
463
464     sres_hdr.sq_len = salvageQueue.total_len;
465     sres_hdr.pq_len = pendingQueue.len;
466     VOL_UNLOCK;
467
468  respond:
469     SYNC_putRes(&salvsync_server_state, fd, &res);
470
471  done:
472     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
473         SALVSYNC_Drop(fd);
474     }
475 }
476
477 /**
478  * request that a volume be salvaged.
479  *
480  * @param[in]  com  inbound command object
481  * @param[out] res  outbound response object
482  *
483  * @return operation status
484  *    @retval SYNC_OK success
485  *    @retval SYNC_DENIED failed to enqueue request
486  *    @retval SYNC_FAILED malformed command packet
487  *
488  * @note this is a SALVSYNC protocol rpc handler
489  *
490  * @internal
491  *
492  * @post the volume is enqueued in the to-be-salvaged queue.
493  *       if the volume was already in the salvage queue, its
494  *       priority (and thus its location in the queue) are
495  *       updated.
496  */
497 static afs_int32
498 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
499 {
500     afs_int32 code = SYNC_OK;
501     struct SalvageQueueNode * node, * clone;
502     int hash = 0;
503
504     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
505         code = SYNC_FAILED;
506         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
507         goto done;
508     }
509
510     clone = LookupNodeByCommand(com->sop, &node);
511
512     if (node == NULL) {
513         if (AllocNode(&node)) {
514             code = SYNC_DENIED;
515             res->hdr->reason = SYNC_REASON_NOMEM;
516             goto done;
517         }
518         clone = node;
519         hash = 1;
520     }
521
522     HandlePrio(clone, node, com->sop->prio);
523
524     switch (node->state) {
525     case SALVSYNC_STATE_QUEUED:
526         UpdateCommandPrio(node);
527         break;
528
529     case SALVSYNC_STATE_ERROR:
530     case SALVSYNC_STATE_DONE:
531     case SALVSYNC_STATE_UNKNOWN:
532         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
533         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
534
535         /*
536          * make sure volgroup parent partition path is kept coherent
537          *
538          * If we ever want to support non-COW clones on a machine holding
539          * the RW site, please note that this code does not work under the
540          * conditions where someone zaps a COW clone on partition X, and
541          * subsequently creates a full clone on partition Y -- we'd need
542          * an inverse to SALVSYNC_com_Link.
543          *  -- tkeiser 11/28/2007
544          */
545         strcpy(node->command.sop.partName, com->sop->partName);
546
547         if (AddToSalvageQueue(node)) {
548             code = SYNC_DENIED;
549         }
550         break;
551
552     default:
553         break;
554     }
555
556     if (hash) {
557         AddNodeToHash(node);
558     }
559
560     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
561     res->sop->state = node->state;
562     res->sop->prio = node->command.sop.prio;
563
564  done:
565     return code;
566 }
567
568 /**
569  * cancel a pending salvage request.
570  *
571  * @param[in]  com  inbound command object
572  * @param[out] res  outbound response object
573  *
574  * @return operation status
575  *    @retval SYNC_OK success
576  *    @retval SYNC_FAILED malformed command packet
577  *
578  * @note this is a SALVSYNC protocol rpc handler
579  *
580  * @internal
581  */
582 static afs_int32
583 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
584 {
585     afs_int32 code = SYNC_OK;
586     struct SalvageQueueNode * node;
587
588     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
589         code = SYNC_FAILED;
590         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
591         goto done;
592     }
593
594     node = LookupNodeByCommand(com->sop, NULL);
595
596     if (node == NULL) {
597         res->sop->state = SALVSYNC_STATE_UNKNOWN;
598         res->sop->prio = 0;
599     } else {
600         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
601         res->sop->prio = node->command.sop.prio;
602         res->sop->state = node->state;
603         if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
604             (node->state == SALVSYNC_STATE_QUEUED)) {
605             DeleteFromSalvageQueue(node);
606         }
607     }
608
609  done:
610     return code;
611 }
612
613 /**
614  * cancel all pending salvage requests.
615  *
616  * @param[in]  com  incoming command object
617  * @param[out] res  outbound response object
618  *
619  * @return operation status
620  *    @retval SYNC_OK success
621  *
622  * @note this is a SALVSYNC protocol rpc handler
623  *
624  * @internal
625  */
626 static afs_int32
627 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
628 {
629     struct SalvageQueueNode * np, *nnp;
630     struct DiskPartition64 * dp;
631
632     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
633         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
634             DeleteFromSalvageQueue(np);
635         }
636     }
637
638     return SYNC_OK;
639 }
640
641 /**
642  * link a queue node for a clone to its parent volume.
643  *
644  * @param[in]  com   inbound command object
645  * @param[out] res   outbound response object
646  *
647  * @return operation status
648  *    @retval SYNC_OK success
649  *    @retval SYNC_FAILED malformed command packet
650  *    @retval SYNC_DENIED the request could not be completed
651  *
652  * @note this is a SALVSYNC protocol rpc handler
653  *
654  * @post the requested volume is marked as a child of another volume.
655  *       thus, future salvage requests for this volume will result in the
656  *       parent of the volume group being scheduled for salvage instead
657  *       of this clone.
658  *
659  * @internal
660  */
661 static afs_int32
662 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
663 {
664     afs_int32 code = SYNC_OK;
665     struct SalvageQueueNode * clone, * parent;
666
667     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
668         code = SYNC_FAILED;
669         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
670         goto done;
671     }
672
673     /* lookup clone's salvage scheduling node */
674     clone = LookupNodeByCommand(com->sop, NULL);
675     if (clone == NULL) {
676         code = SYNC_DENIED;
677         res->hdr->reason = SALVSYNC_REASON_ERROR;
678         goto done;
679     }
680
681     /* lookup parent's salvage scheduling node */
682     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
683     if (parent == NULL) {
684         if (AllocNode(&parent)) {
685             code = SYNC_DENIED;
686             res->hdr->reason = SYNC_REASON_NOMEM;
687             goto done;
688         }
689         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
690         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
691         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
692         AddNodeToHash(parent);
693     }
694
695     if (LinkNode(parent, clone)) {
696         code = SYNC_DENIED;
697         goto done;
698     }
699
700  done:
701     return code;
702 }
703
704 /**
705  * query the status of a volume salvage request.
706  *
707  * @param[in]  com   inbound command object
708  * @param[out] res   outbound response object
709  *
710  * @return operation status
711  *    @retval SYNC_OK success
712  *    @retval SYNC_FAILED malformed command packet
713  *
714  * @note this is a SALVSYNC protocol rpc handler
715  *
716  * @internal
717  */
718 static afs_int32
719 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
720 {
721     afs_int32 code = SYNC_OK;
722     struct SalvageQueueNode * node;
723
724     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
725         code = SYNC_FAILED;
726         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
727         goto done;
728     }
729
730     LookupNodeByCommand(com->sop, &node);
731
732     /* query whether a volume is done salvaging */
733     if (node == NULL) {
734         res->sop->state = SALVSYNC_STATE_UNKNOWN;
735         res->sop->prio = 0;
736     } else {
737         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
738         res->sop->state = node->state;
739         res->sop->prio = node->command.sop.prio;
740     }
741
742  done:
743     return code;
744 }
745
746 static void
747 SALVSYNC_Drop(osi_socket fd)
748 {
749     RemoveHandler(fd);
750     rk_closesocket(fd);
751     AcceptOn();
752 }
753
754 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
755
756 static void
757 AcceptOn(void)
758 {
759     if (AcceptHandler == -1) {
760         opr_Verify(AddHandler(salvsync_server_state.fd,
761                               SALVSYNC_newconnection));
762         AcceptHandler = FindHandler(salvsync_server_state.fd);
763     }
764 }
765
766 static void
767 AcceptOff(void)
768 {
769     if (AcceptHandler != -1) {
770         opr_Verify(RemoveHandler(salvsync_server_state.fd));
771         AcceptHandler = -1;
772     }
773 }
774
775 /* The multiple FD handling code. */
776
777 static void
778 InitHandler(void)
779 {
780     int i;
781     ObtainWriteLock(&SALVSYNC_handler_lock);
782     for (i = 0; i < MAXHANDLERS; i++) {
783         HandlerFD[i] = OSI_NULLSOCKET;
784         HandlerProc[i] = NULL;
785     }
786     ReleaseWriteLock(&SALVSYNC_handler_lock);
787 }
788
789 static void
790 CallHandler(fd_set * fdsetp)
791 {
792     int i;
793     ObtainReadLock(&SALVSYNC_handler_lock);
794     for (i = 0; i < MAXHANDLERS; i++) {
795         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
796             ReleaseReadLock(&SALVSYNC_handler_lock);
797             (*HandlerProc[i]) (HandlerFD[i]);
798             ObtainReadLock(&SALVSYNC_handler_lock);
799         }
800     }
801     ReleaseReadLock(&SALVSYNC_handler_lock);
802 }
803
804 static int
805 AddHandler(osi_socket afd, void (*aproc) (int))
806 {
807     int i;
808     ObtainWriteLock(&SALVSYNC_handler_lock);
809     for (i = 0; i < MAXHANDLERS; i++)
810         if (HandlerFD[i] == OSI_NULLSOCKET)
811             break;
812     if (i >= MAXHANDLERS) {
813         ReleaseWriteLock(&SALVSYNC_handler_lock);
814         return 0;
815     }
816     HandlerFD[i] = afd;
817     HandlerProc[i] = aproc;
818     ReleaseWriteLock(&SALVSYNC_handler_lock);
819     return 1;
820 }
821
822 static int
823 FindHandler(osi_socket afd)
824 {
825     int i;
826     ObtainReadLock(&SALVSYNC_handler_lock);
827     for (i = 0; i < MAXHANDLERS; i++)
828         if (HandlerFD[i] == afd) {
829             ReleaseReadLock(&SALVSYNC_handler_lock);
830             return i;
831         }
832     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
833     osi_Panic("Failed to find handler\n");
834     return -1;                  /* satisfy compiler */
835 }
836
837 static int
838 FindHandler_r(osi_socket afd)
839 {
840     int i;
841     for (i = 0; i < MAXHANDLERS; i++)
842         if (HandlerFD[i] == afd) {
843             return i;
844         }
845     osi_Panic("Failed to find handler\n");
846     return -1;                  /* satisfy compiler */
847 }
848
849 static int
850 RemoveHandler(osi_socket afd)
851 {
852     ObtainWriteLock(&SALVSYNC_handler_lock);
853     HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
854     ReleaseWriteLock(&SALVSYNC_handler_lock);
855     return 1;
856 }
857
858 static void
859 GetHandler(fd_set * fdsetp, int *maxfdp)
860 {
861     int i;
862     int maxfd = -1;
863     FD_ZERO(fdsetp);
864     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
865     for (i = 0; i < MAXHANDLERS; i++)
866         if (HandlerFD[i] != OSI_NULLSOCKET) {
867             FD_SET(HandlerFD[i], fdsetp);
868 #ifndef AFS_NT40_ENV
869             /* On Windows the nfds parameter to select() is ignored */
870             if (maxfd < HandlerFD[i] || maxfd == (int)-1)
871                 maxfd = HandlerFD[i];
872 #endif
873         }
874     *maxfdp = maxfd;
875     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
876 }
877
878 /**
879  * allocate a salvage queue node.
880  *
881  * @param[out] node_out  address in which to store new node pointer
882  *
883  * @return operation status
884  *    @retval 0 success
885  *    @retval 1 failed to allocate node
886  *
887  * @internal
888  */
889 static int
890 AllocNode(struct SalvageQueueNode ** node_out)
891 {
892     int code = 0;
893     struct SalvageQueueNode * node;
894
895     *node_out = node = calloc(1, sizeof(struct SalvageQueueNode));
896     if (node == NULL) {
897         code = 1;
898         goto done;
899     }
900
901     node->type = SALVSYNC_VOLGROUP_PARENT;
902     node->state = SALVSYNC_STATE_UNKNOWN;
903
904  done:
905     return code;
906 }
907
908 /**
909  * link a salvage queue node to its parent.
910  *
911  * @param[in] parent  pointer to queue node for parent of volume group
912  * @param[in] clone   pointer to queue node for a clone
913  *
914  * @return operation status
915  *    @retval 0 success
916  *    @retval 1 failure
917  *
918  * @internal
919  */
920 static int
921 LinkNode(struct SalvageQueueNode * parent,
922          struct SalvageQueueNode * clone)
923 {
924     int code = 0;
925     int idx;
926
927     /* check for attaching a clone to a clone */
928     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
929         code = 1;
930         goto done;
931     }
932
933     /* check for pre-existing registration and openings */
934     for (idx = 0; idx < VOLMAXTYPES; idx++) {
935         if (parent->volgroup.children[idx] == clone) {
936             goto linked;
937         }
938         if (parent->volgroup.children[idx] == NULL) {
939             break;
940         }
941     }
942     if (idx == VOLMAXTYPES) {
943         code = 1;
944         goto done;
945     }
946
947     /* link parent and child */
948     parent->volgroup.children[idx] = clone;
949     clone->type = SALVSYNC_VOLGROUP_CLONE;
950     clone->volgroup.parent = parent;
951
952
953  linked:
954     switch (clone->state) {
955     case SALVSYNC_STATE_QUEUED:
956         DeleteFromSalvageQueue(clone);
957
958     case SALVSYNC_STATE_SALVAGING:
959         switch (parent->state) {
960         case SALVSYNC_STATE_UNKNOWN:
961         case SALVSYNC_STATE_ERROR:
962         case SALVSYNC_STATE_DONE:
963             parent->command.sop.prio = clone->command.sop.prio;
964             AddToSalvageQueue(parent);
965             break;
966
967         case SALVSYNC_STATE_QUEUED:
968             if (clone->command.sop.prio) {
969                 parent->command.sop.prio += clone->command.sop.prio;
970                 UpdateCommandPrio(parent);
971             }
972             break;
973
974         default:
975             break;
976         }
977         break;
978
979     default:
980         break;
981     }
982
983  done:
984     return code;
985 }
986
987 static void
988 HandlePrio(struct SalvageQueueNode * clone,
989            struct SalvageQueueNode * node,
990            afs_uint32 new_prio)
991 {
992     afs_uint32 delta;
993
994     switch (node->state) {
995     case SALVSYNC_STATE_ERROR:
996     case SALVSYNC_STATE_DONE:
997     case SALVSYNC_STATE_UNKNOWN:
998         node->command.sop.prio = 0;
999         break;
1000     default:
1001         break;
1002     }
1003
1004     if (new_prio < clone->command.sop.prio) {
1005         /* strange. let's just set our delta to 1 */
1006         delta = 1;
1007     } else {
1008         delta = new_prio - clone->command.sop.prio;
1009     }
1010
1011     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1012         clone->command.sop.prio = new_prio;
1013     }
1014
1015     node->command.sop.prio += delta;
1016 }
1017
1018 static int
1019 AddToSalvageQueue(struct SalvageQueueNode * node)
1020 {
1021     afs_int32 id;
1022     struct SalvageQueueNode * last = NULL;
1023
1024     id = volutil_GetPartitionID(node->command.sop.partName);
1025     if (id < 0 || id > VOLMAXPARTS) {
1026         return 1;
1027     }
1028     if (!VGetPartitionById_r(id, 0)) {
1029         /* don't enqueue salvage requests for unmounted partitions */
1030         return 1;
1031     }
1032     if (queue_IsOnQueue(node)) {
1033         return 0;
1034     }
1035
1036     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1037         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1038     }
1039     queue_Append(&salvageQueue.part[id], node);
1040     salvageQueue.len[id]++;
1041     salvageQueue.total_len++;
1042     salvageQueue.last_insert = id;
1043     node->partition_id = id;
1044     node->state = SALVSYNC_STATE_QUEUED;
1045
1046     /* reorder, if necessary */
1047     if (last && last->command.sop.prio < node->command.sop.prio) {
1048         UpdateCommandPrio(node);
1049     }
1050
1051     CV_BROADCAST(&salvageQueue.cv);
1052     return 0;
1053 }
1054
1055 static void
1056 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1057 {
1058     if (queue_IsOnQueue(node)) {
1059         queue_Remove(node);
1060         salvageQueue.len[node->partition_id]--;
1061         salvageQueue.total_len--;
1062         node->state = SALVSYNC_STATE_UNKNOWN;
1063         CV_BROADCAST(&salvageQueue.cv);
1064     }
1065 }
1066
1067 static void
1068 AddToPendingQueue(struct SalvageQueueNode * node)
1069 {
1070     queue_Append(&pendingQueue, node);
1071     pendingQueue.len++;
1072     node->state = SALVSYNC_STATE_SALVAGING;
1073     CV_BROADCAST(&pendingQueue.queue_change_cv);
1074 }
1075
1076 static void
1077 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1078 {
1079     if (queue_IsOnQueue(node)) {
1080         queue_Remove(node);
1081         pendingQueue.len--;
1082         node->state = SALVSYNC_STATE_UNKNOWN;
1083         CV_BROADCAST(&pendingQueue.queue_change_cv);
1084     }
1085 }
1086
1087 #if 0
1088 static struct SalvageQueueNode *
1089 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1090 {
1091     struct SalvageQueueNode * np, * nnp;
1092
1093     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1094         if ((np->command.sop.volume == qry->volume) &&
1095             !strncmp(np->command.sop.partName, qry->partName,
1096                      sizeof(qry->partName)))
1097             break;
1098     }
1099
1100     if (queue_IsEnd(&pendingQueue, np))
1101         np = NULL;
1102     return np;
1103 }
1104 #endif
1105
1106 static struct SalvageQueueNode *
1107 LookupPendingCommandByPid(int pid)
1108 {
1109     struct SalvageQueueNode * np, * nnp;
1110
1111     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1112         if (np->pid == pid)
1113             break;
1114     }
1115
1116     if (queue_IsEnd(&pendingQueue, np))
1117         np = NULL;
1118     return np;
1119 }
1120
1121
1122 /* raise the priority of a previously scheduled salvage */
1123 static void
1124 UpdateCommandPrio(struct SalvageQueueNode * node)
1125 {
1126     struct SalvageQueueNode *np, *nnp;
1127     afs_int32 id;
1128     afs_uint32 prio;
1129
1130     opr_Assert(queue_IsOnQueue(node));
1131
1132     prio = node->command.sop.prio;
1133     id = node->partition_id;
1134     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1135         queue_Remove(node);
1136         queue_Prepend(&salvageQueue.part[id], node);
1137     } else {
1138         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1139             if (np->command.sop.prio > prio)
1140                 break;
1141         }
1142         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1143             queue_Remove(node);
1144             queue_Prepend(&salvageQueue.part[id], node);
1145         } else if (node != np) {
1146             queue_Remove(node);
1147             queue_InsertAfter(np, node);
1148         }
1149     }
1150 }
1151
1152 /* this will need to be rearchitected if we ever want more than one thread
1153  * to wait for new salvage nodes */
1154 struct SalvageQueueNode *
1155 SALVSYNC_getWork(void)
1156 {
1157     int i;
1158     struct DiskPartition64 * dp = NULL, * fdp;
1159     static afs_int32 next_part_sched = 0;
1160     struct SalvageQueueNode *node = NULL;
1161
1162     VOL_LOCK;
1163
1164     /*
1165      * wait for work to be scheduled
1166      * if there are no disk partitions, just sit in this wait loop forever
1167      */
1168     while (!salvageQueue.total_len || !DiskPartitionList) {
1169         VOL_CV_WAIT(&salvageQueue.cv);
1170     }
1171
1172     /*
1173      * short circuit for simple case where only one partition has
1174      * scheduled salvages
1175      */
1176     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1177         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1178         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1179         goto have_node;
1180     }
1181
1182
1183     /*
1184      * ok, more than one partition has scheduled salvages.
1185      * now search for partitions with scheduled salvages, but no pending salvages.
1186      */
1187     dp = VGetPartitionById_r(next_part_sched, 0);
1188     if (!dp) {
1189         dp = DiskPartitionList;
1190     }
1191     fdp = dp;
1192
1193     for (i=0 ;
1194          !i || dp != fdp ;
1195          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1196         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1197             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1198             goto have_node;
1199         }
1200     }
1201
1202
1203     /*
1204      * all partitions with scheduled salvages have at least one pending.
1205      * now do an exhaustive search for a scheduled salvage.
1206      */
1207     dp = fdp;
1208
1209     for (i=0 ;
1210          !i || dp != fdp ;
1211          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1212         if (salvageQueue.len[dp->index]) {
1213             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1214             goto have_node;
1215         }
1216     }
1217
1218     /* we should never reach this line */
1219     osi_Panic("Node not found\n");
1220
1221  have_node:
1222     opr_Assert(node != NULL);
1223     node->pid = 0;
1224     partition_salvaging[node->partition_id]++;
1225     DeleteFromSalvageQueue(node);
1226     AddToPendingQueue(node);
1227
1228     if (dp) {
1229         /* update next_part_sched field */
1230         if (dp->next) {
1231             next_part_sched = dp->next->index;
1232         } else if (DiskPartitionList) {
1233             next_part_sched = DiskPartitionList->index;
1234         } else {
1235             next_part_sched = -1;
1236         }
1237     }
1238
1239     VOL_UNLOCK;
1240     return node;
1241 }
1242
1243 /**
1244  * update internal scheduler state to reflect completion of a work unit.
1245  *
1246  * @param[in]  node    salvage queue node object pointer
1247  * @param[in]  result  worker process result code
1248  *
1249  * @post scheduler state is updated.
1250  *
1251  * @internal
1252  */
1253 static void
1254 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1255 {
1256     afs_int32 partid;
1257     int idx;
1258
1259     DeleteFromPendingQueue(node);
1260     partid = node->partition_id;
1261     if (partid >=0 && partid <= VOLMAXPARTS) {
1262         partition_salvaging[partid]--;
1263     }
1264     if (result == 0) {
1265         node->state = SALVSYNC_STATE_DONE;
1266     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1267         node->state = SALVSYNC_STATE_ERROR;
1268     }
1269
1270     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1271         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1272             if (node->volgroup.children[idx]) {
1273                 node->volgroup.children[idx]->state = node->state;
1274             }
1275         }
1276     }
1277 }
1278
1279 /**
1280  * check whether worker child failed.
1281  *
1282  * @param[in] status  status bitfield return by wait()
1283  *
1284  * @return boolean failure code
1285  *    @retval 0 child succeeded
1286  *    @retval 1 child failed
1287  *
1288  * @internal
1289  */
1290 static int
1291 ChildFailed(int status)
1292 {
1293     return (WCOREDUMP(status) ||
1294             WIFSIGNALED(status) ||
1295             ((WEXITSTATUS(status) != 0) &&
1296              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1297 }
1298
1299
1300 /**
1301  * notify salvsync scheduler of node completion, by child pid.
1302  *
1303  * @param[in]  pid     pid of worker child
1304  * @param[in]  status  worker status bitfield from wait()
1305  *
1306  * @post scheduler state is updated.
1307  *       if status code is a failure, fileserver notification was attempted
1308  *
1309  * @see SALVSYNC_doneWork_r
1310  */
1311 void
1312 SALVSYNC_doneWorkByPid(int pid, int status)
1313 {
1314     struct SalvageQueueNode * node;
1315     char partName[16];
1316     afs_uint32 volids[VOLMAXTYPES+1];
1317     unsigned int idx;
1318
1319     memset(volids, 0, sizeof(volids));
1320
1321     VOL_LOCK;
1322     node = LookupPendingCommandByPid(pid);
1323     if (node != NULL) {
1324         SALVSYNC_doneWork_r(node, status);
1325
1326         if (ChildFailed(status)) {
1327             /* populate volume id list for later processing outside the glock */
1328             volids[0] = node->command.sop.volume;
1329             strcpy(partName, node->command.sop.partName);
1330             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1331                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1332                     if (node->volgroup.children[idx]) {
1333                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1334                     }
1335                 }
1336             }
1337         }
1338     }
1339     VOL_UNLOCK;
1340
1341     /*
1342      * if necessary, notify fileserver of
1343      * failure to salvage volume group
1344      * [we cannot guarantee that the child made the
1345      *  appropriate notifications (e.g. SIGSEGV)]
1346      *  -- tkeiser 11/28/2007
1347      */
1348     if (ChildFailed(status)) {
1349         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1350             if (volids[idx]) {
1351                 FSYNC_VolOp(volids[idx],
1352                             partName,
1353                             FSYNC_VOL_FORCE_ERROR,
1354                             FSYNC_WHATEVER,
1355                             NULL);
1356             }
1357         }
1358     }
1359 }
1360
1361 #endif /* AFS_DEMAND_ATTACH_FS */