dafs-vnode-locking-20080204
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29 RCSID
30     ("$Header$");
31
32 #include <sys/types.h>
33 #include <stdio.h>
34 #ifdef AFS_NT40_ENV
35 #include <winsock2.h>
36 #include <time.h>
37 #else
38 #include <sys/param.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <netdb.h>
42 #include <sys/time.h>
43 #endif
44 #include <errno.h>
45 #include <assert.h>
46 #include <signal.h>
47 #include <string.h>
48
49
50 #include <rx/xdr.h>
51 #include <afs/afsint.h>
52 #include "nfs.h"
53 #include <afs/errors.h>
54 #include "salvsync.h"
55 #include "lwp.h"
56 #include "lock.h"
57 #include <afs/afssyscalls.h>
58 #include "ihandle.h"
59 #include "vnode.h"
60 #include "volume.h"
61 #include "partition.h"
62 #include <rx/rx_queue.h>
63 #include <afs/procmgmt.h>
64
65 #if !defined(offsetof)
66 #include <stddef.h>
67 #endif
68
69 #ifdef USE_UNIX_SOCKETS
70 #include <afs/afsutil.h>
71 #include <sys/un.h>
72 #endif
73
74
75 /*@printflike@*/ extern void Log(const char *format, ...);
76
77 #ifdef osi_Assert
78 #undef osi_Assert
79 #endif
80 #define osi_Assert(e) (void)(e)
81
82 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
83                                  * move = dump+restore can run on single server */
84
85 /* Forward declarations */
86 static void * SALVSYNC_syncThread(void *);
87 static void SALVSYNC_newconnection(int fd);
88 static void SALVSYNC_com(int fd);
89 static void SALVSYNC_Drop(int fd);
90 static void AcceptOn(void);
91 static void AcceptOff(void);
92 static void InitHandler(void);
93 static void CallHandler(fd_set * fdsetp);
94 static int AddHandler(int afd, void (*aproc) (int));
95 static int FindHandler(register int afd);
96 static int FindHandler_r(register int afd);
97 static int RemoveHandler(register int afd);
98 static void GetHandler(fd_set * fdsetp, int *maxfdp);
99
100
101 /*
102  * This lock controls access to the handler array.
103  */
104 struct Lock SALVSYNC_handler_lock;
105
106
107 #ifdef AFS_DEMAND_ATTACH_FS
108 /*
109  * SALVSYNC is a feature specific to the demand attach fileserver
110  */
111
112 static int AllocNode(struct SalvageQueueNode ** node);
113
114 static int AddToSalvageQueue(struct SalvageQueueNode * node);
115 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
116 static void AddToPendingQueue(struct SalvageQueueNode * node);
117 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
118 static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
119 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
120 static void UpdateCommandPrio(struct SalvageQueueNode * node);
121 static void HandlePrio(struct SalvageQueueNode * clone, 
122                        struct SalvageQueueNode * parent,
123                        afs_uint32 new_prio);
124
125 static int LinkNode(struct SalvageQueueNode * parent,
126                     struct SalvageQueueNode * clone);
127
128 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName, 
129                                             struct SalvageQueueNode ** parent);
130 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
131                                                      struct SalvageQueueNode ** parent);
132 static void AddNodeToHash(struct SalvageQueueNode * node);
133 static void DeleteNodeFromHash(struct SalvageQueueNode * node);
134
135 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
136 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
137 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
138 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
139 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
140
141
142 extern int LogLevel;
143 extern int VInit;
144 extern pthread_mutex_t vol_salvsync_mutex;
145
146 /**
147  * salvsync server socket handle.
148  */
149 static SYNC_server_state_t salvsync_server_state = 
150     { -1,                       /* file descriptor */
151       SALVSYNC_ENDPOINT_DECL,   /* server endpoint */
152       SALVSYNC_PROTO_VERSION,   /* protocol version */
153       5,                        /* bind() retry limit */
154       100,                      /* listen() queue depth */
155       "SALVSYNC",               /* protocol name string */
156     };
157
158
159 /**
160  * queue of all volumes waiting to be salvaged.
161  */
162 struct SalvageQueue {
163     volatile int total_len;
164     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
165     volatile int len[VOLMAXPARTS+1];
166     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
167     pthread_cond_t cv;
168 };
169 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
170
171 /**
172  * queue of all volumes currently being salvaged.
173  */
174 struct QueueHead {
175     volatile struct rx_queue q;  /**< queue of salvages in progress */
176     volatile int len;            /**< length of in-progress queue */
177     pthread_cond_t queue_change_cv;
178 };
179 static struct QueueHead pendingQueue;  /* volumes being salvaged */
180
181 /* XXX
182  * whether a partition has a salvage in progress
183  *
184  * the salvager code only permits one salvage per partition at a time
185  *
186  * the following hack tries to keep salvaged parallelism high by
187  * only permitting one salvage dispatch per partition at a time
188  *
189  * unfortunately, the parallel salvager currently
190  * has a rather braindead routine that won't permit
191  * multiple salvages on the same "device".  this
192  * function happens to break pretty badly on lvm, raid luns, etc.
193  *
194  * this hack isn't good enough to stop the device limiting code from
195  * crippling performance.  someday that code needs to be rewritten
196  */
197 static int partition_salvaging[VOLMAXPARTS+1];
198
199 #define VSHASH_SIZE 64
200 #define VSHASH_MASK (VSHASH_SIZE-1)
201 #define VSHASH(vid) ((vid)&VSHASH_MASK)
202
203 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
204
205 static struct SalvageQueueNode *
206 LookupNode(afs_uint32 vid, char * partName,
207            struct SalvageQueueNode ** parent)
208 {
209     struct rx_queue *qp, *nqp;
210     struct SalvageQueueNode *vsp;
211     int idx = VSHASH(vid);
212
213     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
214         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
215         if ((vsp->command.sop.volume == vid) &&
216             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
217             break;
218         }
219     }
220
221     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
222         vsp = NULL;
223     }
224
225     if (parent) {
226         if (vsp) {
227             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
228                 vsp->volgroup.parent : vsp;
229         } else {
230             *parent = NULL;
231         }
232     }
233
234     return vsp;
235 }
236
237 static struct SalvageQueueNode *
238 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
239                     struct SalvageQueueNode ** parent)
240 {
241     return LookupNode(qry->volume, qry->partName, parent);
242 }
243
244 static void
245 AddNodeToHash(struct SalvageQueueNode * node)
246 {
247     int idx = VSHASH(node->command.sop.volume);
248
249     if (queue_IsOnQueue(&node->hash_chain)) {
250         return;
251     }
252
253     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
254     SalvageHashTable[idx].len++;
255 }
256
257 static void
258 DeleteNodeFromHash(struct SalvageQueueNode * node)
259 {
260     int idx = VSHASH(node->command.sop.volume);
261
262     if (queue_IsNotOnQueue(&node->hash_chain)) {
263         return;
264     }
265
266     queue_Remove(&node->hash_chain);
267     SalvageHashTable[idx].len--;
268 }
269
270 void
271 SALVSYNC_salvInit(void)
272 {
273     int i;
274     pthread_t tid;
275     pthread_attr_t tattr;
276
277     /* initialize the queues */
278     assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
279     for (i = 0; i <= VOLMAXPARTS; i++) {
280         queue_Init(&salvageQueue.part[i]);
281         salvageQueue.len[i] = 0;
282     }
283     assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
284     queue_Init(&pendingQueue);
285     salvageQueue.total_len = pendingQueue.len = 0;
286     salvageQueue.last_insert = -1;
287     memset(partition_salvaging, 0, sizeof(partition_salvaging));
288
289     for (i = 0; i < VSHASH_SIZE; i++) {
290         assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
291         SalvageHashTable[i].len = 0;
292         queue_Init(&SalvageHashTable[i]);
293     }
294
295     /* start the salvsync thread */
296     assert(pthread_attr_init(&tattr) == 0);
297     assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
298     assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
299 }
300
301
302 static fd_set SALVSYNC_readfds;
303
304 static void *
305 SALVSYNC_syncThread(void * args)
306 {
307     int on = 1;
308     int code;
309     int numTries;
310     int tid;
311     SYNC_server_state_t * state = &salvsync_server_state;
312
313     SYNC_getAddr(&state->endpoint, &state->addr);
314     SYNC_cleanupSock(state);
315
316 #ifndef AFS_NT40_ENV
317     (void)signal(SIGPIPE, SIG_IGN);
318 #endif
319
320     /* set our 'thread-id' so that the host hold table works */
321     MUTEX_ENTER(&rx_stats_mutex);       /* protects rxi_pthread_hinum */
322     tid = ++rxi_pthread_hinum;
323     MUTEX_EXIT(&rx_stats_mutex);
324     pthread_setspecific(rx_thread_id_key, (void *)tid);
325     Log("Set thread id %d for SALVSYNC_syncThread\n", tid);
326
327     state->fd = SYNC_getSock(&state->endpoint);
328     code = SYNC_bindSock(state);
329     assert(!code);
330
331     InitHandler();
332     AcceptOn();
333
334     for (;;) {
335         int maxfd;
336         GetHandler(&SALVSYNC_readfds, &maxfd);
337         /* Note: check for >= 1 below is essential since IOMGR_select
338          * doesn't have exactly same semantics as select.
339          */
340         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
341             CallHandler(&SALVSYNC_readfds);
342     }
343
344     return NULL;
345 }
346
347 static void
348 SALVSYNC_newconnection(int afd)
349 {
350 #ifdef USE_UNIX_SOCKETS
351     struct sockaddr_un other;
352 #else  /* USE_UNIX_SOCKETS */
353     struct sockaddr_in other;
354 #endif
355     int junk, fd;
356     junk = sizeof(other);
357     fd = accept(afd, (struct sockaddr *)&other, &junk);
358     if (fd == -1) {
359         Log("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
360         assert(1 == 2);
361     } else if (!AddHandler(fd, SALVSYNC_com)) {
362         AcceptOff();
363         assert(AddHandler(fd, SALVSYNC_com));
364     }
365 }
366
367 /* this function processes commands from an salvsync file descriptor (fd) */
368 static afs_int32 SALV_cnt = 0;
369 static void
370 SALVSYNC_com(int fd)
371 {
372     SYNC_command com;
373     SYNC_response res;
374     SALVSYNC_response_hdr sres_hdr;
375     SALVSYNC_command scom;
376     SALVSYNC_response sres;
377     SYNC_PROTO_BUF_DECL(buf);
378     
379     com.payload.buf = (void *)buf;
380     com.payload.len = SYNC_PROTO_MAX_LEN;
381     res.payload.buf = (void *) &sres_hdr;
382     res.payload.len = sizeof(sres_hdr);
383     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
384     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
385
386     scom.hdr = &com.hdr;
387     scom.sop = (SALVSYNC_command_hdr *) buf;
388     scom.com = &com;
389     sres.hdr = &res.hdr;
390     sres.sop = &sres_hdr;
391     sres.res = &res;
392
393     SALV_cnt++;
394     if (SYNC_getCom(fd, &com)) {
395         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
396         SALVSYNC_Drop(fd);
397         return;
398     }
399
400     if (com.recv_len < sizeof(com.hdr)) {
401         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
402         res.hdr.response = SYNC_COM_ERROR;
403         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
404         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
405         goto respond;
406     }
407
408     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
409         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
410         res.hdr.response = SYNC_COM_ERROR;
411         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
412         goto respond;
413     }
414
415     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
416         res.hdr.response = SYNC_OK;
417         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
418         goto respond;
419     }
420
421     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
422         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
423         res.hdr.response = SYNC_COM_ERROR;
424         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
425         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
426         goto respond;
427     }
428
429     VOL_LOCK;
430     switch (com.hdr.command) {
431     case SALVSYNC_NOP:
432         break;
433     case SALVSYNC_SALVAGE:
434     case SALVSYNC_RAISEPRIO:
435         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
436         break;
437     case SALVSYNC_CANCEL:
438         /* cancel a salvage */
439         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
440         break;
441     case SALVSYNC_CANCELALL:
442         /* cancel all queued salvages */
443         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
444         break;
445     case SALVSYNC_QUERY:
446         /* query whether a volume is done salvaging */
447         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
448         break;
449     case SALVSYNC_OP_LINK:
450         /* link a clone to its parent in the scheduler */
451         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
452         break;
453     default:
454         res.hdr.response = SYNC_BAD_COMMAND;
455         break;
456     }
457
458     sres_hdr.sq_len = salvageQueue.total_len;
459     sres_hdr.pq_len = pendingQueue.len;
460     VOL_UNLOCK;
461
462  respond:
463     SYNC_putRes(fd, &res);
464     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
465         SALVSYNC_Drop(fd);
466     }
467 }
468
469 /**
470  * request that a volume be salvaged.
471  *
472  * @param[in]  com  inbound command object
473  * @param[out] res  outbound response object
474  *
475  * @return operation status
476  *    @retval SYNC_OK success
477  *    @retval SYNC_DENIED failed to enqueue request
478  *    @retval SYNC_FAILED malformed command packet
479  *
480  * @note this is a SALVSYNC protocol rpc handler
481  *
482  * @internal
483  *
484  * @post the volume is enqueued in the to-be-salvaged queue.  
485  *       if the volume was already in the salvage queue, its 
486  *       priority (and thus its location in the queue) are 
487  *       updated.
488  */
489 static afs_int32
490 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
491 {
492     afs_int32 code = SYNC_OK;
493     struct SalvageQueueNode * node, * clone;
494     int hash = 0;
495
496     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
497         code = SYNC_FAILED;
498         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
499         goto done;
500     }
501
502     clone = LookupNodeByCommand(com->sop, &node);
503
504     if (node == NULL) {
505         if (AllocNode(&node)) {
506             code = SYNC_DENIED;
507             res->hdr->reason = SYNC_REASON_NOMEM;
508             goto done;
509         }
510         clone = node;
511         hash = 1;
512     }
513
514     HandlePrio(clone, node, com->sop->prio);
515
516     switch (node->state) {
517     case SALVSYNC_STATE_QUEUED:
518         UpdateCommandPrio(node);
519         break;
520
521     case SALVSYNC_STATE_ERROR:
522     case SALVSYNC_STATE_DONE:
523     case SALVSYNC_STATE_UNKNOWN:
524         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
525         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
526
527         /* 
528          * make sure volgroup parent partition path is kept coherent
529          *
530          * If we ever want to support non-COW clones on a machine holding
531          * the RW site, please note that this code does not work under the
532          * conditions where someone zaps a COW clone on partition X, and
533          * subsequently creates a full clone on partition Y -- we'd need
534          * an inverse to SALVSYNC_com_Link.
535          *  -- tkeiser 11/28/2007
536          */
537         strcpy(node->command.sop.partName, com->sop->partName);
538
539         if (AddToSalvageQueue(node)) {
540             code = SYNC_DENIED;
541         }
542         break;
543
544     default:
545         break;
546     }
547
548     if (hash) {
549         AddNodeToHash(node);
550     }
551
552     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
553     res->sop->state = node->state;
554     res->sop->prio = node->command.sop.prio;
555
556  done:
557     return code;
558 }
559
560 /**
561  * cancel a pending salvage request.
562  *
563  * @param[in]  com  inbound command object
564  * @param[out] res  outbound response object
565  *
566  * @return operation status
567  *    @retval SYNC_OK success
568  *    @retval SYNC_FAILED malformed command packet
569  *
570  * @note this is a SALVSYNC protocol rpc handler
571  *
572  * @internal
573  */
574 static afs_int32
575 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
576 {
577     afs_int32 code = SYNC_OK;
578     struct SalvageQueueNode * node;
579
580     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
581         code = SYNC_FAILED;
582         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
583         goto done;
584     }
585
586     node = LookupNodeByCommand(com->sop, NULL);
587
588     if (node == NULL) {
589         res->sop->state = SALVSYNC_STATE_UNKNOWN;
590         res->sop->prio = 0;
591     } else {
592         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
593         res->sop->prio = node->command.sop.prio;
594         res->sop->state = node->state;
595         if ((node->type == SALVSYNC_VOLGROUP_PARENT) && 
596             (node->state == SALVSYNC_STATE_QUEUED)) {
597             DeleteFromSalvageQueue(node);
598         }
599     }
600
601  done:
602     return code;
603 }
604
605 /**
606  * cancel all pending salvage requests.
607  *
608  * @param[in]  com  incoming command object
609  * @param[out] res  outbound response object
610  *
611  * @return operation status
612  *    @retval SYNC_OK success
613  *
614  * @note this is a SALVSYNC protocol rpc handler
615  *
616  * @internal
617  */
618 static afs_int32
619 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
620 {
621     struct SalvageQueueNode * np, *nnp;
622     struct DiskPartition * dp;
623
624     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
625         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
626             DeleteFromSalvageQueue(np);
627         }
628     }
629
630     return SYNC_OK;
631 }
632
633 /**
634  * link a queue node for a clone to its parent volume.
635  *
636  * @param[in]  com   inbound command object
637  * @param[out] res   outbound response object
638  *
639  * @return operation status
640  *    @retval SYNC_OK success
641  *    @retval SYNC_FAILED malformed command packet
642  *    @retval SYNC_DENIED the request could not be completed
643  *
644  * @note this is a SALVSYNC protocol rpc handler
645  *
646  * @post the requested volume is marked as a child of another volume.
647  *       thus, future salvage requests for this volume will result in the
648  *       parent of the volume group being scheduled for salvage instead
649  *       of this clone.
650  *
651  * @internal
652  */
653 static afs_int32
654 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
655 {
656     afs_int32 code = SYNC_OK;
657     struct SalvageQueueNode * clone, * parent;
658
659     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
660         code = SYNC_FAILED;
661         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
662         goto done;
663     }
664
665     /* lookup clone's salvage scheduling node */
666     clone = LookupNodeByCommand(com->sop, NULL);
667     if (clone == NULL) {
668         code = SYNC_DENIED;
669         res->hdr->reason = SALVSYNC_REASON_ERROR;
670         goto done;
671     }
672
673     /* lookup parent's salvage scheduling node */
674     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
675     if (parent == NULL) {
676         if (AllocNode(&parent)) {
677             code = SYNC_DENIED;
678             res->hdr->reason = SYNC_REASON_NOMEM;
679             goto done;
680         }
681         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
682         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
683         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
684         AddNodeToHash(parent);
685     }
686
687     if (LinkNode(parent, clone)) {
688         code = SYNC_DENIED;
689         goto done;
690     }
691
692  done:
693     return code;
694 }
695
696 /**
697  * query the status of a volume salvage request.
698  *
699  * @param[in]  com   inbound command object
700  * @param[out] res   outbound response object
701  *
702  * @return operation status
703  *    @retval SYNC_OK success
704  *    @retval SYNC_FAILED malformed command packet
705  *
706  * @note this is a SALVSYNC protocol rpc handler
707  *
708  * @internal
709  */
710 static afs_int32
711 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
712 {
713     afs_int32 code = SYNC_OK;
714     struct SalvageQueueNode * node;
715
716     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
717         code = SYNC_FAILED;
718         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
719         goto done;
720     }
721
722     LookupNodeByCommand(com->sop, &node);
723
724     /* query whether a volume is done salvaging */
725     if (node == NULL) {
726         res->sop->state = SALVSYNC_STATE_UNKNOWN;
727         res->sop->prio = 0;
728     } else {
729         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
730         res->sop->state = node->state;
731         res->sop->prio = node->command.sop.prio;
732     }
733
734  done:
735     return code;
736 }
737
738 static void
739 SALVSYNC_Drop(int fd)
740 {
741     RemoveHandler(fd);
742 #ifdef AFS_NT40_ENV
743     closesocket(fd);
744 #else
745     close(fd);
746 #endif
747     AcceptOn();
748 }
749
750 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
751
752 static void
753 AcceptOn(void)
754 {
755     if (AcceptHandler == -1) {
756         assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
757         AcceptHandler = FindHandler(salvsync_server_state.fd);
758     }
759 }
760
761 static void
762 AcceptOff(void)
763 {
764     if (AcceptHandler != -1) {
765         assert(RemoveHandler(salvsync_server_state.fd));
766         AcceptHandler = -1;
767     }
768 }
769
770 /* The multiple FD handling code. */
771
772 static int HandlerFD[MAXHANDLERS];
773 static void (*HandlerProc[MAXHANDLERS]) (int);
774
775 static void
776 InitHandler(void)
777 {
778     register int i;
779     ObtainWriteLock(&SALVSYNC_handler_lock);
780     for (i = 0; i < MAXHANDLERS; i++) {
781         HandlerFD[i] = -1;
782         HandlerProc[i] = NULL;
783     }
784     ReleaseWriteLock(&SALVSYNC_handler_lock);
785 }
786
787 static void
788 CallHandler(fd_set * fdsetp)
789 {
790     register int i;
791     ObtainReadLock(&SALVSYNC_handler_lock);
792     for (i = 0; i < MAXHANDLERS; i++) {
793         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
794             ReleaseReadLock(&SALVSYNC_handler_lock);
795             (*HandlerProc[i]) (HandlerFD[i]);
796             ObtainReadLock(&SALVSYNC_handler_lock);
797         }
798     }
799     ReleaseReadLock(&SALVSYNC_handler_lock);
800 }
801
802 static int
803 AddHandler(int afd, void (*aproc) (int))
804 {
805     register int i;
806     ObtainWriteLock(&SALVSYNC_handler_lock);
807     for (i = 0; i < MAXHANDLERS; i++)
808         if (HandlerFD[i] == -1)
809             break;
810     if (i >= MAXHANDLERS) {
811         ReleaseWriteLock(&SALVSYNC_handler_lock);
812         return 0;
813     }
814     HandlerFD[i] = afd;
815     HandlerProc[i] = aproc;
816     ReleaseWriteLock(&SALVSYNC_handler_lock);
817     return 1;
818 }
819
820 static int
821 FindHandler(register int afd)
822 {
823     register int i;
824     ObtainReadLock(&SALVSYNC_handler_lock);
825     for (i = 0; i < MAXHANDLERS; i++)
826         if (HandlerFD[i] == afd) {
827             ReleaseReadLock(&SALVSYNC_handler_lock);
828             return i;
829         }
830     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
831     assert(1 == 2);
832     return -1;                  /* satisfy compiler */
833 }
834
835 static int
836 FindHandler_r(register int afd)
837 {
838     register int i;
839     for (i = 0; i < MAXHANDLERS; i++)
840         if (HandlerFD[i] == afd) {
841             return i;
842         }
843     assert(1 == 2);
844     return -1;                  /* satisfy compiler */
845 }
846
847 static int
848 RemoveHandler(register int afd)
849 {
850     ObtainWriteLock(&SALVSYNC_handler_lock);
851     HandlerFD[FindHandler_r(afd)] = -1;
852     ReleaseWriteLock(&SALVSYNC_handler_lock);
853     return 1;
854 }
855
856 static void
857 GetHandler(fd_set * fdsetp, int *maxfdp)
858 {
859     register int i;
860     register int maxfd = -1;
861     FD_ZERO(fdsetp);
862     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
863     for (i = 0; i < MAXHANDLERS; i++)
864         if (HandlerFD[i] != -1) {
865             FD_SET(HandlerFD[i], fdsetp);
866             if (maxfd < HandlerFD[i])
867                 maxfd = HandlerFD[i];
868         }
869     *maxfdp = maxfd;
870     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
871 }
872
873 /**
874  * allocate a salvage queue node.
875  *
876  * @param[out] node_out  address in which to store new node pointer
877  *
878  * @return operation status
879  *    @retval 0 success
880  *    @retval 1 failed to allocate node
881  *
882  * @internal
883  */
884 static int
885 AllocNode(struct SalvageQueueNode ** node_out)
886 {
887     int code = 0;
888     struct SalvageQueueNode * node;
889
890     *node_out = node = (struct SalvageQueueNode *) 
891         malloc(sizeof(struct SalvageQueueNode));
892     if (node == NULL) {
893         code = 1;
894         goto done;
895     }
896
897     memset(node, 0, sizeof(struct SalvageQueueNode));
898     node->type = SALVSYNC_VOLGROUP_PARENT;
899     node->state = SALVSYNC_STATE_UNKNOWN;
900
901  done:
902     return code;
903 }
904
905 /**
906  * link a salvage queue node to its parent.
907  *
908  * @param[in] parent  pointer to queue node for parent of volume group
909  * @param[in] clone   pointer to queue node for a clone
910  *
911  * @return operation status
912  *    @retval 0 success
913  *    @retval 1 failure
914  *
915  * @internal
916  */
917 static int
918 LinkNode(struct SalvageQueueNode * parent,
919          struct SalvageQueueNode * clone)
920 {
921     int code = 0;
922     int idx;
923
924     /* check for attaching a clone to a clone */
925     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
926         code = 1;
927         goto done;
928     }
929
930     /* check for pre-existing registration and openings */
931     for (idx = 0; idx < VOLMAXTYPES; idx++) {
932         if (parent->volgroup.children[idx] == clone) {
933             goto linked;
934         }
935         if (parent->volgroup.children[idx] == NULL) {
936             break;
937         }
938     }
939     if (idx == VOLMAXTYPES) {
940         code = 1;
941         goto done;
942     }
943
944     /* link parent and child */
945     parent->volgroup.children[idx] = clone;
946     clone->type = SALVSYNC_VOLGROUP_CLONE;
947     clone->volgroup.parent = parent;
948
949
950  linked:
951     switch (clone->state) {
952     case SALVSYNC_STATE_QUEUED:
953         DeleteFromSalvageQueue(clone);
954
955     case SALVSYNC_STATE_SALVAGING:
956         switch (parent->state) {
957         case SALVSYNC_STATE_UNKNOWN:
958         case SALVSYNC_STATE_ERROR:
959         case SALVSYNC_STATE_DONE:
960             parent->command.sop.prio = clone->command.sop.prio;
961             AddToSalvageQueue(parent);
962             break;
963
964         case SALVSYNC_STATE_QUEUED:
965             if (clone->command.sop.prio) {
966                 parent->command.sop.prio += clone->command.sop.prio;
967                 UpdateCommandPrio(parent);
968             }
969             break;
970
971         default:
972             break;
973         }
974         break;
975
976     default:
977         break;
978     }
979
980  done:
981     return code;
982 }
983
984 static void
985 HandlePrio(struct SalvageQueueNode * clone, 
986            struct SalvageQueueNode * node,
987            afs_uint32 new_prio)
988 {
989     afs_uint32 delta;
990
991     switch (node->state) {
992     case SALVSYNC_STATE_ERROR:
993     case SALVSYNC_STATE_DONE:
994     case SALVSYNC_STATE_UNKNOWN:
995         node->command.sop.prio = 0;
996         break;
997     }
998
999     if (new_prio < clone->command.sop.prio) {
1000         /* strange. let's just set our delta to 1 */
1001         delta = 1;
1002     } else {
1003         delta = new_prio - clone->command.sop.prio;
1004     }
1005
1006     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1007         clone->command.sop.prio = new_prio;
1008     }
1009
1010     node->command.sop.prio += delta;
1011 }
1012
1013 static int
1014 AddToSalvageQueue(struct SalvageQueueNode * node)
1015 {
1016     afs_int32 id;
1017     struct SalvageQueueNode * last = NULL;
1018
1019     id = volutil_GetPartitionID(node->command.sop.partName);
1020     if (id < 0 || id > VOLMAXPARTS) {
1021         return 1;
1022     }
1023     if (!VGetPartitionById_r(id, 0)) {
1024         /* don't enqueue salvage requests for unmounted partitions */
1025         return 1;
1026     }
1027     if (queue_IsOnQueue(node)) {
1028         return 0;
1029     }
1030
1031     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1032         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1033     }
1034     queue_Append(&salvageQueue.part[id], node);
1035     salvageQueue.len[id]++;
1036     salvageQueue.total_len++;
1037     salvageQueue.last_insert = id;
1038     node->partition_id = id;
1039     node->state = SALVSYNC_STATE_QUEUED;
1040
1041     /* reorder, if necessary */
1042     if (last && last->command.sop.prio < node->command.sop.prio) {
1043         UpdateCommandPrio(node);
1044     }
1045
1046     assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1047     return 0;
1048 }
1049
1050 static void
1051 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1052 {
1053     if (queue_IsOnQueue(node)) {
1054         queue_Remove(node);
1055         salvageQueue.len[node->partition_id]--;
1056         salvageQueue.total_len--;
1057         node->state = SALVSYNC_STATE_UNKNOWN;
1058         assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
1059     }
1060 }
1061
1062 static void
1063 AddToPendingQueue(struct SalvageQueueNode * node)
1064 {
1065     queue_Append(&pendingQueue, node);
1066     pendingQueue.len++;
1067     node->state = SALVSYNC_STATE_SALVAGING;
1068     assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1069 }
1070
1071 static void
1072 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1073 {
1074     if (queue_IsOnQueue(node)) {
1075         queue_Remove(node);
1076         pendingQueue.len--;
1077         node->state = SALVSYNC_STATE_UNKNOWN;
1078         assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1079     }
1080 }
1081
1082 static struct SalvageQueueNode *
1083 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1084 {
1085     struct SalvageQueueNode * np, * nnp;
1086
1087     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1088         if ((np->command.sop.volume == qry->volume) && 
1089             !strncmp(np->command.sop.partName, qry->partName,
1090                      sizeof(qry->partName)))
1091             break;
1092     }
1093
1094     if (queue_IsEnd(&pendingQueue, np))
1095         np = NULL;
1096     return np;
1097 }
1098
1099 static struct SalvageQueueNode *
1100 LookupPendingCommandByPid(int pid)
1101 {
1102     struct SalvageQueueNode * np, * nnp;
1103
1104     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1105         if (np->pid == pid)
1106             break;
1107     }
1108
1109     if (queue_IsEnd(&pendingQueue, np))
1110         np = NULL;
1111     return np;
1112 }
1113
1114
1115 /* raise the priority of a previously scheduled salvage */
1116 static void
1117 UpdateCommandPrio(struct SalvageQueueNode * node)
1118 {
1119     struct SalvageQueueNode *np, *nnp;
1120     afs_int32 id;
1121     afs_uint32 prio;
1122
1123     assert(queue_IsOnQueue(node));
1124
1125     prio = node->command.sop.prio;
1126     id = node->partition_id;
1127     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1128         queue_Remove(node);
1129         queue_Prepend(&salvageQueue.part[id], node);
1130     } else {
1131         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1132             if (np->command.sop.prio > prio)
1133                 break;
1134         }
1135         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1136             queue_Remove(node);
1137             queue_Prepend(&salvageQueue.part[id], node);
1138         } else if (node != np) {
1139             queue_Remove(node);
1140             queue_InsertAfter(np, node);
1141         }
1142     }
1143 }
1144
1145 /* this will need to be rearchitected if we ever want more than one thread
1146  * to wait for new salvage nodes */
1147 struct SalvageQueueNode * 
1148 SALVSYNC_getWork(void)
1149 {
1150     int i, ret;
1151     struct DiskPartition * dp = NULL, * fdp;
1152     static afs_int32 next_part_sched = 0;
1153     struct SalvageQueueNode *node = NULL, *np;
1154
1155     VOL_LOCK;
1156
1157     /*
1158      * wait for work to be scheduled
1159      * if there are no disk partitions, just sit in this wait loop forever
1160      */
1161     while (!salvageQueue.total_len || !DiskPartitionList) {
1162         VOL_CV_WAIT(&salvageQueue.cv);
1163     }
1164
1165     /* 
1166      * short circuit for simple case where only one partition has
1167      * scheduled salvages
1168      */
1169     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1170         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1171         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1172         goto have_node;
1173     }
1174
1175
1176     /* 
1177      * ok, more than one partition has scheduled salvages.
1178      * now search for partitions with scheduled salvages, but no pending salvages. 
1179      */
1180     dp = VGetPartitionById_r(next_part_sched, 0);
1181     if (!dp) {
1182         dp = DiskPartitionList;
1183     }
1184     fdp = dp;
1185
1186     for (i=0 ; 
1187          !i || dp != fdp ; 
1188          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1189         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1190             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1191             goto have_node;
1192         }
1193     }
1194
1195
1196     /*
1197      * all partitions with scheduled salvages have at least one pending.
1198      * now do an exhaustive search for a scheduled salvage.
1199      */
1200     dp = fdp;
1201
1202     for (i=0 ; 
1203          !i || dp != fdp ; 
1204          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1205         if (salvageQueue.len[dp->index]) {
1206             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1207             goto have_node;
1208         }
1209     }
1210
1211     /* we should never reach this line */
1212     assert(1==2);
1213
1214  have_node:
1215     assert(node != NULL);
1216     node->pid = 0;
1217     partition_salvaging[node->partition_id]++;
1218     DeleteFromSalvageQueue(node);
1219     AddToPendingQueue(node);
1220
1221     if (dp) {
1222         /* update next_part_sched field */
1223         if (dp->next) {
1224             next_part_sched = dp->next->index;
1225         } else if (DiskPartitionList) {
1226             next_part_sched = DiskPartitionList->index;
1227         } else {
1228             next_part_sched = -1;
1229         }
1230     }
1231
1232  bail:
1233     VOL_UNLOCK;
1234     return node;
1235 }
1236
1237 /**
1238  * update internal scheduler state to reflect completion of a work unit.
1239  *
1240  * @param[in]  node    salvage queue node object pointer
1241  * @param[in]  result  worker process result code
1242  *
1243  * @post scheduler state is updated.
1244  *
1245  * @internal
1246  */
1247 static void
1248 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1249 {
1250     afs_int32 partid;
1251     int idx;
1252
1253     DeleteFromPendingQueue(node);
1254     partid = node->partition_id;
1255     if (partid >=0 && partid <= VOLMAXPARTS) {
1256         partition_salvaging[partid]--;
1257     }
1258     if (result == 0) {
1259         node->state = SALVSYNC_STATE_DONE;
1260     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1261         node->state = SALVSYNC_STATE_ERROR;
1262     }
1263
1264     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1265         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1266             if (node->volgroup.children[idx]) {
1267                 node->volgroup.children[idx]->state = node->state;
1268             }
1269         }
1270     }
1271 }
1272
1273 /**
1274  * check whether worker child failed.
1275  *
1276  * @param[in] status  status bitfield return by wait()
1277  *
1278  * @return boolean failure code
1279  *    @retval 0 child succeeded
1280  *    @retval 1 child failed
1281  *
1282  * @internal
1283  */
1284 static int
1285 ChildFailed(int status)
1286 {
1287     return (WCOREDUMP(status) || 
1288             WIFSIGNALED(status) || 
1289             ((WEXITSTATUS(status) != 0) && 
1290              (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1291 }
1292
1293
1294 /**
1295  * notify salvsync scheduler of node completion, by child pid.
1296  *
1297  * @param[in]  pid     pid of worker child
1298  * @param[in]  status  worker status bitfield from wait()
1299  *
1300  * @post scheduler state is updated.
1301  *       if status code is a failure, fileserver notification was attempted
1302  *
1303  * @see SALVSYNC_doneWork_r
1304  */
1305 void
1306 SALVSYNC_doneWorkByPid(int pid, int status)
1307 {
1308     struct SalvageQueueNode * node;
1309     char partName[16];
1310     afs_uint32 volids[VOLMAXTYPES+1];
1311     unsigned int idx;
1312
1313     memset(volids, 0, sizeof(volids));
1314
1315     VOL_LOCK;
1316     node = LookupPendingCommandByPid(pid);
1317     if (node != NULL) {
1318         SALVSYNC_doneWork_r(node, status);
1319
1320         if (ChildFailed(status)) {
1321             /* populate volume id list for later processing outside the glock */
1322             volids[0] = node->command.sop.volume;
1323             strcpy(partName, node->command.sop.partName);
1324             if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1325                 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1326                     if (node->volgroup.children[idx]) {
1327                         volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1328                     }
1329                 }
1330             }
1331         }
1332     }
1333     VOL_UNLOCK;
1334
1335     /*
1336      * if necessary, notify fileserver of
1337      * failure to salvage volume group
1338      * [we cannot guarantee that the child made the
1339      *  appropriate notifications (e.g. SIGSEGV)]
1340      *  -- tkeiser 11/28/2007
1341      */
1342     if (ChildFailed(status)) {
1343         for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1344             if (volids[idx]) {
1345                 FSYNC_VolOp(volids[idx],
1346                             partName,
1347                             FSYNC_VOL_FORCE_ERROR,
1348                             FSYNC_WHATEVER,
1349                             NULL);
1350             }
1351         }
1352     }
1353 }
1354
1355 #endif /* AFS_DEMAND_ATTACH_FS */