dafs-salvage-deal-with-clones-20071101
[openafs.git] / src / vol / salvsync-server.c
1 /*
2  * Copyright 2006-2007, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * salvsync-server.c
12  *
13  * OpenAFS demand attach fileserver
14  * Salvage server synchronization with fileserver.
15  */
16
17 /* This controls the size of an fd_set; it must be defined early before
18  * the system headers define that type and the macros that operate on it.
19  * Its value should be as large as the maximum file descriptor limit we
20  * are likely to run into on any platform.  Right now, that is 65536
21  * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
25
26 #include <afsconfig.h>
27 #include <afs/param.h>
28
29 RCSID
30     ("$Header$");
31
32 #include <sys/types.h>
33 #include <stdio.h>
34 #ifdef AFS_NT40_ENV
35 #include <winsock2.h>
36 #include <time.h>
37 #else
38 #include <sys/param.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <netdb.h>
42 #include <sys/time.h>
43 #endif
44 #include <errno.h>
45 #include <assert.h>
46 #include <signal.h>
47 #include <string.h>
48
49
50 #include <rx/xdr.h>
51 #include <afs/afsint.h>
52 #include "nfs.h"
53 #include <afs/errors.h>
54 #include "salvsync.h"
55 #include "lwp.h"
56 #include "lock.h"
57 #include <afs/afssyscalls.h>
58 #include "ihandle.h"
59 #include "vnode.h"
60 #include "volume.h"
61 #include "partition.h"
62 #include <rx/rx_queue.h>
63
64 #if !defined(offsetof)
65 #include <stddef.h>
66 #endif
67
68 #ifdef USE_UNIX_SOCKETS
69 #include <afs/afsutil.h>
70 #include <sys/un.h>
71 #endif
72
73
74 /*@printflike@*/ extern void Log(const char *format, ...);
75
76 #ifdef osi_Assert
77 #undef osi_Assert
78 #endif
79 #define osi_Assert(e) (void)(e)
80
81 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
82                                  * move = dump+restore can run on single server */
83
84 #define MAX_BIND_TRIES  5       /* Number of times to retry socket bind */
85
86
87
88 /* Forward declarations */
89 static void * SALVSYNC_syncThread(void *);
90 static void SALVSYNC_newconnection(int fd);
91 static void SALVSYNC_com(int fd);
92 static void SALVSYNC_Drop(int fd);
93 static void AcceptOn(void);
94 static void AcceptOff(void);
95 static void InitHandler(void);
96 static void CallHandler(fd_set * fdsetp);
97 static int AddHandler(int afd, void (*aproc) (int));
98 static int FindHandler(register int afd);
99 static int FindHandler_r(register int afd);
100 static int RemoveHandler(register int afd);
101 static void GetHandler(fd_set * fdsetp, int *maxfdp);
102
103
104 /*
105  * This lock controls access to the handler array.
106  */
107 struct Lock SALVSYNC_handler_lock;
108
109
110 #ifdef AFS_DEMAND_ATTACH_FS
111 /*
112  * SALVSYNC is a feature specific to the demand attach fileserver
113  */
114
115 static int AllocNode(struct SalvageQueueNode ** node);
116
117 static int AddToSalvageQueue(struct SalvageQueueNode * node);
118 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
119 static void AddToPendingQueue(struct SalvageQueueNode * node);
120 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
121 static struct SalvageQueueNode * LookupPendingCommand(SALVSYNC_command_hdr * qry);
122 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
123 static void UpdateCommandPrio(struct SalvageQueueNode * node);
124 static void HandlePrio(struct SalvageQueueNode * clone, 
125                        struct SalvageQueueNode * parent,
126                        afs_uint32 new_prio);
127
128 static int LinkNode(struct SalvageQueueNode * parent,
129                     struct SalvageQueueNode * clone);
130
131 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName, 
132                                             struct SalvageQueueNode ** parent);
133 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
134                                                      struct SalvageQueueNode ** parent);
135 static void AddNodeToHash(struct SalvageQueueNode * node);
136 static void DeleteNodeFromHash(struct SalvageQueueNode * node);
137
138 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
139 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
140 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
141 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
142 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
143
144
145 extern int LogLevel;
146 extern int VInit;
147 extern pthread_mutex_t vol_salvsync_mutex;
148
149 static int AcceptSd = -1;               /* Socket used by server for accepting connections */
150
151
152 /**
153  * queue of all volumes waiting to be salvaged.
154  */
155 struct SalvageQueue {
156     volatile int total_len;
157     volatile afs_int32 last_insert;    /**< id of last partition to have a salvage node inserted */
158     volatile int len[VOLMAXPARTS+1];
159     volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
160     pthread_cond_t cv;
161 };
162 static struct SalvageQueue salvageQueue;  /* volumes waiting to be salvaged */
163
164 /**
165  * queue of all volumes currently being salvaged.
166  */
167 struct QueueHead {
168     volatile struct rx_queue q;  /**< queue of salvages in progress */
169     volatile int len;            /**< length of in-progress queue */
170     pthread_cond_t queue_change_cv;
171 };
172 static struct QueueHead pendingQueue;  /* volumes being salvaged */
173
174 /* XXX
175  * whether a partition has a salvage in progress
176  *
177  * the salvager code only permits one salvage per partition at a time
178  *
179  * the following hack tries to keep salvaged parallelism high by
180  * only permitting one salvage dispatch per partition at a time
181  *
182  * unfortunately, the parallel salvager currently
183  * has a rather braindead routine that won't permit
184  * multiple salvages on the same "device".  this
185  * function happens to break pretty badly on lvm, raid luns, etc.
186  *
187  * this hack isn't good enough to stop the device limiting code from
188  * crippling performance.  someday that code needs to be rewritten
189  */
190 static int partition_salvaging[VOLMAXPARTS+1];
191
192 #define VSHASH_SIZE 64
193 #define VSHASH_MASK (VSHASH_SIZE-1)
194 #define VSHASH(vid) ((vid)&VSHASH_MASK)
195
196 static struct QueueHead  SalvageHashTable[VSHASH_SIZE];
197
198 static struct SalvageQueueNode *
199 LookupNode(afs_uint32 vid, char * partName,
200            struct SalvageQueueNode ** parent)
201 {
202     struct rx_queue *qp, *nqp;
203     struct SalvageQueueNode *vsp;
204     int idx = VSHASH(vid);
205
206     for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
207         vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
208         if ((vsp->command.sop.volume == vid) &&
209             !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
210             break;
211         }
212     }
213
214     if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
215         vsp = NULL;
216     }
217
218     if (parent) {
219         if (vsp) {
220             *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
221                 vsp->volgroup.parent : vsp;
222         } else {
223             *parent = NULL;
224         }
225     }
226
227     return vsp;
228 }
229
230 static struct SalvageQueueNode *
231 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
232                     struct SalvageQueueNode ** parent)
233 {
234     return LookupNode(qry->volume, qry->partName, parent);
235 }
236
237 static void
238 AddNodeToHash(struct SalvageQueueNode * node)
239 {
240     int idx = VSHASH(node->command.sop.volume);
241
242     if (queue_IsOnQueue(&node->hash_chain)) {
243         return;
244     }
245
246     queue_Append(&SalvageHashTable[idx], &node->hash_chain);
247     SalvageHashTable[idx].len++;
248 }
249
250 static void
251 DeleteNodeFromHash(struct SalvageQueueNode * node)
252 {
253     int idx = VSHASH(node->command.sop.volume);
254
255     if (queue_IsNotOnQueue(&node->hash_chain)) {
256         return;
257     }
258
259     queue_Remove(&node->hash_chain);
260     SalvageHashTable[idx].len--;
261 }
262
263 void
264 SALVSYNC_salvInit(void)
265 {
266     int i;
267     pthread_t tid;
268     pthread_attr_t tattr;
269
270     /* initialize the queues */
271     assert(pthread_cond_init(&salvageQueue.cv, NULL) == 0);
272     for (i = 0; i <= VOLMAXPARTS; i++) {
273         queue_Init(&salvageQueue.part[i]);
274         salvageQueue.len[i] = 0;
275     }
276     assert(pthread_cond_init(&pendingQueue.queue_change_cv, NULL) == 0);
277     queue_Init(&pendingQueue);
278     salvageQueue.total_len = pendingQueue.len = 0;
279     salvageQueue.last_insert = -1;
280     memset(partition_salvaging, 0, sizeof(partition_salvaging));
281
282     for (i = 0; i < VSHASH_SIZE; i++) {
283         assert(pthread_cond_init(&SalvageHashTable[i].queue_change_cv, NULL) == 0);
284         SalvageHashTable[i].len = 0;
285         queue_Init(&SalvageHashTable[i]);
286     }
287
288     /* start the salvsync thread */
289     assert(pthread_attr_init(&tattr) == 0);
290     assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
291     assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
292 }
293
294 #ifdef USE_UNIX_SOCKETS
295 static int
296 getport(struct sockaddr_un *addr)
297 {
298     int sd;
299     char tbuffer[AFSDIR_PATH_MAX]; 
300     
301     strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
302                "fssync.sock", NULL);
303     
304     memset(addr, 0, sizeof(*addr));
305     addr->sun_family = AF_UNIX;
306     strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
307     assert((sd = socket(AF_UNIX, SOCK_STREAM, 0)) >= 0);
308     return sd;
309 }
310 #else
311 static int
312 getport(struct sockaddr_in *addr)
313 {
314     int sd;
315
316     memset(addr, 0, sizeof(*addr));
317     assert((sd = socket(AF_INET, SOCK_STREAM, 0)) >= 0);
318 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
319     addr->sin_len = sizeof(struct sockaddr_in);
320 #endif
321     addr->sin_addr.s_addr = htonl(0x7f000001);
322     addr->sin_family = AF_INET; /* was localhost->h_addrtype */
323     addr->sin_port = htons(2041);       /* XXXX htons not _really_ neccessary */
324
325     return sd;
326 }
327 #endif
328
329 static fd_set SALVSYNC_readfds;
330
331 static void *
332 SALVSYNC_syncThread(void * args)
333 {
334     struct sockaddr_in addr;
335     int on = 1;
336     int code;
337     int numTries;
338     int tid;
339 #ifdef USE_UNIX_SOCKETS
340     char tbuffer[AFSDIR_PATH_MAX]; 
341 #endif
342
343 #ifndef AFS_NT40_ENV
344     (void)signal(SIGPIPE, SIG_IGN);
345 #endif
346
347     /* set our 'thread-id' so that the host hold table works */
348     MUTEX_ENTER(&rx_stats_mutex);       /* protects rxi_pthread_hinum */
349     tid = ++rxi_pthread_hinum;
350     MUTEX_EXIT(&rx_stats_mutex);
351     pthread_setspecific(rx_thread_id_key, (void *)tid);
352     Log("Set thread id %d for SALVSYNC_syncThread\n", tid);
353
354 #ifdef USE_UNIX_SOCKETS
355     strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
356                "fssync.sock", NULL);
357     /* ignore errors */
358     remove(tbuffer);
359 #endif /* USE_UNIX_SOCKETS */
360
361     AcceptSd = getport(&addr);
362     /* Reuseaddr needed because system inexplicably leaves crud lying around */
363     code =
364         setsockopt(AcceptSd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
365                    sizeof(on));
366     if (code)
367         Log("SALVSYNC_sync: setsockopt failed with (%d)\n", errno);
368
369     for (numTries = 0; numTries < MAX_BIND_TRIES; numTries++) {
370         if ((code =
371              bind(AcceptSd, (struct sockaddr *)&addr, sizeof(addr))) == 0)
372             break;
373         Log("SALVSYNC_sync: bind failed with (%d), will sleep and retry\n",
374             errno);
375         sleep(5);
376     }
377     assert(!code);
378     listen(AcceptSd, 100);
379     InitHandler();
380     AcceptOn();
381
382     for (;;) {
383         int maxfd;
384         GetHandler(&SALVSYNC_readfds, &maxfd);
385         /* Note: check for >= 1 below is essential since IOMGR_select
386          * doesn't have exactly same semantics as select.
387          */
388         if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, NULL) >= 1)
389             CallHandler(&SALVSYNC_readfds);
390     }
391
392     return NULL;
393 }
394
395 static void
396 SALVSYNC_newconnection(int afd)
397 {
398 #ifdef USE_UNIX_SOCKETS
399     struct sockaddr_un other;
400 #else  /* USE_UNIX_SOCKETS */
401     struct sockaddr_in other;
402 #endif
403     int junk, fd;
404     junk = sizeof(other);
405     fd = accept(afd, (struct sockaddr *)&other, &junk);
406     if (fd == -1) {
407         Log("SALVSYNC_newconnection:  accept failed, errno==%d\n", errno);
408         assert(1 == 2);
409     } else if (!AddHandler(fd, SALVSYNC_com)) {
410         AcceptOff();
411         assert(AddHandler(fd, SALVSYNC_com));
412     }
413 }
414
415 /* this function processes commands from an salvsync file descriptor (fd) */
416 static afs_int32 SALV_cnt = 0;
417 static void
418 SALVSYNC_com(int fd)
419 {
420     SYNC_command com;
421     SYNC_response res;
422     SALVSYNC_response_hdr sres_hdr;
423     SALVSYNC_command scom;
424     SALVSYNC_response sres;
425     SYNC_PROTO_BUF_DECL(buf);
426     
427     com.payload.buf = (void *)buf;
428     com.payload.len = SYNC_PROTO_MAX_LEN;
429     res.payload.buf = (void *) &sres_hdr;
430     res.payload.len = sizeof(sres_hdr);
431     res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
432     res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
433
434     scom.hdr = &com.hdr;
435     scom.sop = (SALVSYNC_command_hdr *) buf;
436     scom.com = &com;
437     sres.hdr = &res.hdr;
438     sres.sop = &sres_hdr;
439     sres.res = &res;
440
441     SALV_cnt++;
442     if (SYNC_getCom(fd, &com)) {
443         Log("SALVSYNC_com:  read failed; dropping connection (cnt=%d)\n", SALV_cnt);
444         SALVSYNC_Drop(fd);
445         return;
446     }
447
448     if (com.recv_len < sizeof(com.hdr)) {
449         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
450         res.hdr.response = SYNC_COM_ERROR;
451         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
452         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
453         goto respond;
454     }
455
456     if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
457         Log("SALVSYNC_com:  invalid protocol version (%u)\n", com.hdr.proto_version);
458         res.hdr.response = SYNC_COM_ERROR;
459         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
460         goto respond;
461     }
462
463     if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
464         res.hdr.response = SYNC_OK;
465         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
466         goto respond;
467     }
468
469     if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
470         Log("SALVSYNC_com:  invalid protocol message length (%u)\n", com.recv_len);
471         res.hdr.response = SYNC_COM_ERROR;
472         res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
473         res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
474         goto respond;
475     }
476
477     VOL_LOCK;
478     switch (com.hdr.command) {
479     case SALVSYNC_NOP:
480         break;
481     case SALVSYNC_SALVAGE:
482     case SALVSYNC_RAISEPRIO:
483         res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
484         break;
485     case SALVSYNC_CANCEL:
486         /* cancel a salvage */
487         res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
488         break;
489     case SALVSYNC_CANCELALL:
490         /* cancel all queued salvages */
491         res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
492         break;
493     case SALVSYNC_QUERY:
494         /* query whether a volume is done salvaging */
495         res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
496         break;
497     case SALVSYNC_OP_LINK:
498         /* link a clone to its parent in the scheduler */
499         res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
500         break;
501     default:
502         res.hdr.response = SYNC_BAD_COMMAND;
503         break;
504     }
505
506     sres_hdr.sq_len = salvageQueue.total_len;
507     sres_hdr.pq_len = pendingQueue.len;
508     VOL_UNLOCK;
509
510  respond:
511     SYNC_putRes(fd, &res);
512     if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
513         SALVSYNC_Drop(fd);
514     }
515 }
516
517 static afs_int32
518 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
519 {
520     afs_int32 code = SYNC_OK;
521     struct SalvageQueueNode * node, * clone;
522     int hash = 0;
523
524     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
525         code = SYNC_FAILED;
526         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
527         goto done;
528     }
529
530     clone = LookupNodeByCommand(com->sop, &node);
531
532     if (node == NULL) {
533         if (AllocNode(&node)) {
534             code = SYNC_DENIED;
535             res->hdr->reason = SYNC_REASON_NOMEM;
536             goto done;
537         }
538         clone = node;
539         hash = 1;
540     }
541
542     HandlePrio(clone, node, com->sop->prio);
543
544     switch (node->state) {
545     case SALVSYNC_STATE_QUEUED:
546         UpdateCommandPrio(node);
547         break;
548
549     case SALVSYNC_STATE_ERROR:
550     case SALVSYNC_STATE_DONE:
551     case SALVSYNC_STATE_UNKNOWN:
552         memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
553         memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
554         if (AddToSalvageQueue(node)) {
555             code = SYNC_DENIED;
556         }
557         break;
558
559     default:
560         break;
561     }
562
563     if (hash) {
564         AddNodeToHash(node);
565     }
566
567     res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
568     res->sop->state = node->state;
569     res->sop->prio = node->command.sop.prio;
570
571  done:
572     return code;
573 }
574
575 static afs_int32
576 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
577 {
578     afs_int32 code = SYNC_OK;
579     struct SalvageQueueNode * node;
580
581     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
582         code = SYNC_FAILED;
583         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
584         goto done;
585     }
586
587     node = LookupNodeByCommand(com->sop, NULL);
588
589     if (node == NULL) {
590         res->sop->state = SALVSYNC_STATE_UNKNOWN;
591         res->sop->prio = 0;
592     } else {
593         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
594         res->sop->prio = node->command.sop.prio;
595         res->sop->state = node->state;
596         if ((node->type == SALVSYNC_VOLGROUP_PARENT) && 
597             (node->state == SALVSYNC_STATE_QUEUED)) {
598             DeleteFromSalvageQueue(node);
599         }
600     }
601
602  done:
603     return code;
604 }
605
606 static afs_int32
607 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
608 {
609     struct SalvageQueueNode * np, *nnp;
610     struct DiskPartition * dp;
611
612     for (dp = DiskPartitionList ; dp ; dp = dp->next) {
613         for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
614             DeleteFromSalvageQueue(np);
615         }
616     }
617
618     return SYNC_OK;
619 }
620
621 /**
622  * link a queue node for a clone to its parent volume.
623  */
624 static afs_int32
625 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
626 {
627     afs_int32 code = SYNC_OK;
628     struct SalvageQueueNode * clone, * parent;
629
630     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
631         code = SYNC_FAILED;
632         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
633         goto done;
634     }
635
636     /* lookup clone's salvage scheduling node */
637     clone = LookupNodeByCommand(com->sop, NULL);
638     if (clone == NULL) {
639         code = SYNC_DENIED;
640         res->hdr->reason = SALVSYNC_REASON_ERROR;
641         goto done;
642     }
643
644     /* lookup parent's salvage scheduling node */
645     parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
646     if (parent == NULL) {
647         if (AllocNode(&parent)) {
648             code = SYNC_DENIED;
649             res->hdr->reason = SYNC_REASON_NOMEM;
650             goto done;
651         }
652         memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
653         memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
654         parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
655         AddNodeToHash(parent);
656     }
657
658     if (LinkNode(parent, clone)) {
659         code = SYNC_DENIED;
660         goto done;
661     }
662
663  done:
664     return code;
665 }
666
667 static afs_int32
668 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
669 {
670     afs_int32 code = SYNC_OK;
671     struct SalvageQueueNode * node;
672
673     if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
674         code = SYNC_FAILED;
675         res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
676         goto done;
677     }
678
679     LookupNodeByCommand(com->sop, &node);
680
681     /* query whether a volume is done salvaging */
682     if (node == NULL) {
683         res->sop->state = SALVSYNC_STATE_UNKNOWN;
684         res->sop->prio = 0;
685     } else {
686         res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
687         res->sop->state = node->state;
688         res->sop->prio = node->command.sop.prio;
689     }
690
691  done:
692     return code;
693 }
694
695 static void
696 SALVSYNC_Drop(int fd)
697 {
698     RemoveHandler(fd);
699 #ifdef AFS_NT40_ENV
700     closesocket(fd);
701 #else
702     close(fd);
703 #endif
704     AcceptOn();
705 }
706
707 static int AcceptHandler = -1;  /* handler id for accept, if turned on */
708
709 static void
710 AcceptOn(void)
711 {
712     if (AcceptHandler == -1) {
713         assert(AddHandler(AcceptSd, SALVSYNC_newconnection));
714         AcceptHandler = FindHandler(AcceptSd);
715     }
716 }
717
718 static void
719 AcceptOff(void)
720 {
721     if (AcceptHandler != -1) {
722         assert(RemoveHandler(AcceptSd));
723         AcceptHandler = -1;
724     }
725 }
726
727 /* The multiple FD handling code. */
728
729 static int HandlerFD[MAXHANDLERS];
730 static void (*HandlerProc[MAXHANDLERS]) (int);
731
732 static void
733 InitHandler(void)
734 {
735     register int i;
736     ObtainWriteLock(&SALVSYNC_handler_lock);
737     for (i = 0; i < MAXHANDLERS; i++) {
738         HandlerFD[i] = -1;
739         HandlerProc[i] = NULL;
740     }
741     ReleaseWriteLock(&SALVSYNC_handler_lock);
742 }
743
744 static void
745 CallHandler(fd_set * fdsetp)
746 {
747     register int i;
748     ObtainReadLock(&SALVSYNC_handler_lock);
749     for (i = 0; i < MAXHANDLERS; i++) {
750         if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
751             ReleaseReadLock(&SALVSYNC_handler_lock);
752             (*HandlerProc[i]) (HandlerFD[i]);
753             ObtainReadLock(&SALVSYNC_handler_lock);
754         }
755     }
756     ReleaseReadLock(&SALVSYNC_handler_lock);
757 }
758
759 static int
760 AddHandler(int afd, void (*aproc) (int))
761 {
762     register int i;
763     ObtainWriteLock(&SALVSYNC_handler_lock);
764     for (i = 0; i < MAXHANDLERS; i++)
765         if (HandlerFD[i] == -1)
766             break;
767     if (i >= MAXHANDLERS) {
768         ReleaseWriteLock(&SALVSYNC_handler_lock);
769         return 0;
770     }
771     HandlerFD[i] = afd;
772     HandlerProc[i] = aproc;
773     ReleaseWriteLock(&SALVSYNC_handler_lock);
774     return 1;
775 }
776
777 static int
778 FindHandler(register int afd)
779 {
780     register int i;
781     ObtainReadLock(&SALVSYNC_handler_lock);
782     for (i = 0; i < MAXHANDLERS; i++)
783         if (HandlerFD[i] == afd) {
784             ReleaseReadLock(&SALVSYNC_handler_lock);
785             return i;
786         }
787     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
788     assert(1 == 2);
789     return -1;                  /* satisfy compiler */
790 }
791
792 static int
793 FindHandler_r(register int afd)
794 {
795     register int i;
796     for (i = 0; i < MAXHANDLERS; i++)
797         if (HandlerFD[i] == afd) {
798             return i;
799         }
800     assert(1 == 2);
801     return -1;                  /* satisfy compiler */
802 }
803
804 static int
805 RemoveHandler(register int afd)
806 {
807     ObtainWriteLock(&SALVSYNC_handler_lock);
808     HandlerFD[FindHandler_r(afd)] = -1;
809     ReleaseWriteLock(&SALVSYNC_handler_lock);
810     return 1;
811 }
812
813 static void
814 GetHandler(fd_set * fdsetp, int *maxfdp)
815 {
816     register int i;
817     register int maxfd = -1;
818     FD_ZERO(fdsetp);
819     ObtainReadLock(&SALVSYNC_handler_lock);     /* just in case */
820     for (i = 0; i < MAXHANDLERS; i++)
821         if (HandlerFD[i] != -1) {
822             FD_SET(HandlerFD[i], fdsetp);
823             if (maxfd < HandlerFD[i])
824                 maxfd = HandlerFD[i];
825         }
826     *maxfdp = maxfd;
827     ReleaseReadLock(&SALVSYNC_handler_lock);    /* just in case */
828 }
829
830 static int
831 AllocNode(struct SalvageQueueNode ** node_out)
832 {
833     int code = 0;
834     struct SalvageQueueNode * node;
835
836     *node_out = node = (struct SalvageQueueNode *) 
837         malloc(sizeof(struct SalvageQueueNode));
838     if (node == NULL) {
839         code = 1;
840         goto done;
841     }
842
843     memset(node, 0, sizeof(struct SalvageQueueNode));
844     node->type = SALVSYNC_VOLGROUP_PARENT;
845     node->state = SALVSYNC_STATE_UNKNOWN;
846
847  done:
848     return code;
849 }
850
851 static int
852 LinkNode(struct SalvageQueueNode * parent,
853          struct SalvageQueueNode * clone)
854 {
855     int code = 0;
856     int idx;
857
858     /* check for attaching a clone to a clone */
859     if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
860         code = 1;
861         goto done;
862     }
863
864     /* check for pre-existing registration and openings */
865     for (idx = 0; idx < VOLMAXTYPES; idx++) {
866         if (parent->volgroup.children[idx] == clone) {
867             goto linked;
868         }
869         if (parent->volgroup.children[idx] == NULL) {
870             break;
871         }
872     }
873     if (idx == VOLMAXTYPES) {
874         code = 1;
875         goto done;
876     }
877
878     /* link parent and child */
879     parent->volgroup.children[idx] = clone;
880     clone->type = SALVSYNC_VOLGROUP_CLONE;
881     clone->volgroup.parent = parent;
882
883
884  linked:
885     switch (clone->state) {
886     case SALVSYNC_STATE_QUEUED:
887         DeleteFromSalvageQueue(clone);
888
889     case SALVSYNC_STATE_SALVAGING:
890         switch (parent->state) {
891         case SALVSYNC_STATE_UNKNOWN:
892         case SALVSYNC_STATE_ERROR:
893         case SALVSYNC_STATE_DONE:
894             parent->command.sop.prio = clone->command.sop.prio;
895             AddToSalvageQueue(parent);
896             break;
897
898         case SALVSYNC_STATE_QUEUED:
899             if (clone->command.sop.prio) {
900                 parent->command.sop.prio += clone->command.sop.prio;
901                 UpdateCommandPrio(parent);
902             }
903             break;
904
905         default:
906             break;
907         }
908         break;
909
910     default:
911         break;
912     }
913
914  done:
915     return code;
916 }
917
918 static void
919 HandlePrio(struct SalvageQueueNode * clone, 
920            struct SalvageQueueNode * node,
921            afs_uint32 new_prio)
922 {
923     afs_uint32 delta;
924
925     switch (node->state) {
926     case SALVSYNC_STATE_ERROR:
927     case SALVSYNC_STATE_DONE:
928     case SALVSYNC_STATE_UNKNOWN:
929         node->command.sop.prio = 0;
930         break;
931     }
932
933     if (new_prio < clone->command.sop.prio) {
934         /* strange. let's just set our delta to 1 */
935         delta = 1;
936     } else {
937         delta = new_prio - clone->command.sop.prio;
938     }
939
940     if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
941         clone->command.sop.prio = new_prio;
942     }
943
944     node->command.sop.prio += delta;
945 }
946
947 static int
948 AddToSalvageQueue(struct SalvageQueueNode * node)
949 {
950     afs_int32 id;
951     struct SalvageQueueNode * last = NULL;
952
953     id = volutil_GetPartitionID(node->command.sop.partName);
954     if (id < 0 || id > VOLMAXPARTS) {
955         return 1;
956     }
957     if (!VGetPartitionById_r(id, 0)) {
958         /* don't enqueue salvage requests for unmounted partitions */
959         return 1;
960     }
961     if (queue_IsOnQueue(node)) {
962         return 0;
963     }
964
965     if (queue_IsNotEmpty(&salvageQueue.part[id])) {
966         last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
967     }
968     queue_Append(&salvageQueue.part[id], node);
969     salvageQueue.len[id]++;
970     salvageQueue.total_len++;
971     salvageQueue.last_insert = id;
972     node->partition_id = id;
973     node->state = SALVSYNC_STATE_QUEUED;
974
975     /* reorder, if necessary */
976     if (last && last->command.sop.prio < node->command.sop.prio) {
977         UpdateCommandPrio(node);
978     }
979
980     assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
981     return 0;
982 }
983
984 static void
985 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
986 {
987     if (queue_IsOnQueue(node)) {
988         queue_Remove(node);
989         salvageQueue.len[node->partition_id]--;
990         salvageQueue.total_len--;
991         node->state = SALVSYNC_STATE_UNKNOWN;
992         assert(pthread_cond_broadcast(&salvageQueue.cv) == 0);
993     }
994 }
995
996 static void
997 AddToPendingQueue(struct SalvageQueueNode * node)
998 {
999     queue_Append(&pendingQueue, node);
1000     pendingQueue.len++;
1001     node->state = SALVSYNC_STATE_SALVAGING;
1002     assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1003 }
1004
1005 static void
1006 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1007 {
1008     if (queue_IsOnQueue(node)) {
1009         queue_Remove(node);
1010         pendingQueue.len--;
1011         node->state = SALVSYNC_STATE_UNKNOWN;
1012         assert(pthread_cond_broadcast(&pendingQueue.queue_change_cv) == 0);
1013     }
1014 }
1015
1016 static struct SalvageQueueNode *
1017 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1018 {
1019     struct SalvageQueueNode * np, * nnp;
1020
1021     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1022         if ((np->command.sop.volume == qry->volume) && 
1023             !strncmp(np->command.sop.partName, qry->partName,
1024                      sizeof(qry->partName)))
1025             break;
1026     }
1027
1028     if (queue_IsEnd(&pendingQueue, np))
1029         np = NULL;
1030     return np;
1031 }
1032
1033 static struct SalvageQueueNode *
1034 LookupPendingCommandByPid(int pid)
1035 {
1036     struct SalvageQueueNode * np, * nnp;
1037
1038     for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1039         if (np->pid == pid)
1040             break;
1041     }
1042
1043     if (queue_IsEnd(&pendingQueue, np))
1044         np = NULL;
1045     return np;
1046 }
1047
1048
1049 /* raise the priority of a previously scheduled salvage */
1050 static void
1051 UpdateCommandPrio(struct SalvageQueueNode * node)
1052 {
1053     struct SalvageQueueNode *np, *nnp;
1054     afs_int32 id;
1055     afs_uint32 prio;
1056
1057     assert(queue_IsOnQueue(node));
1058
1059     prio = node->command.sop.prio;
1060     id = node->partition_id;
1061     if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1062         queue_Remove(node);
1063         queue_Prepend(&salvageQueue.part[id], node);
1064     } else {
1065         for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1066             if (np->command.sop.prio > prio)
1067                 break;
1068         }
1069         if (queue_IsEnd(&salvageQueue.part[id], np)) {
1070             queue_Remove(node);
1071             queue_Prepend(&salvageQueue.part[id], node);
1072         } else if (node != np) {
1073             queue_Remove(node);
1074             queue_InsertAfter(np, node);
1075         }
1076     }
1077 }
1078
1079 /* this will need to be rearchitected if we ever want more than one thread
1080  * to wait for new salvage nodes */
1081 struct SalvageQueueNode * 
1082 SALVSYNC_getWork(void)
1083 {
1084     int i, ret;
1085     struct DiskPartition * dp = NULL, * fdp;
1086     static afs_int32 next_part_sched = 0;
1087     struct SalvageQueueNode *node = NULL, *np;
1088
1089     VOL_LOCK;
1090
1091     /*
1092      * wait for work to be scheduled
1093      * if there are no disk partitions, just sit in this wait loop forever
1094      */
1095     while (!salvageQueue.total_len || !DiskPartitionList) {
1096       assert(pthread_cond_wait(&salvageQueue.cv, &vol_glock_mutex) == 0);
1097     }
1098
1099     /* 
1100      * short circuit for simple case where only one partition has
1101      * scheduled salvages
1102      */
1103     if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1104         (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1105         node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1106         goto have_node;
1107     }
1108
1109
1110     /* 
1111      * ok, more than one partition has scheduled salvages.
1112      * now search for partitions with scheduled salvages, but no pending salvages. 
1113      */
1114     dp = VGetPartitionById_r(next_part_sched, 0);
1115     if (!dp) {
1116         dp = DiskPartitionList;
1117     }
1118     fdp = dp;
1119
1120     for (i=0 ; 
1121          !i || dp != fdp ; 
1122          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1123         if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1124             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1125             goto have_node;
1126         }
1127     }
1128
1129
1130     /*
1131      * all partitions with scheduled salvages have at least one pending.
1132      * now do an exhaustive search for a scheduled salvage.
1133      */
1134     dp = fdp;
1135
1136     for (i=0 ; 
1137          !i || dp != fdp ; 
1138          dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1139         if (salvageQueue.len[dp->index]) {
1140             node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1141             goto have_node;
1142         }
1143     }
1144
1145     /* we should never reach this line */
1146     assert(1==2);
1147
1148  have_node:
1149     assert(node != NULL);
1150     node->pid = 0;
1151     partition_salvaging[node->partition_id]++;
1152     DeleteFromSalvageQueue(node);
1153     AddToPendingQueue(node);
1154
1155     if (dp) {
1156         /* update next_part_sched field */
1157         if (dp->next) {
1158             next_part_sched = dp->next->index;
1159         } else if (DiskPartitionList) {
1160             next_part_sched = DiskPartitionList->index;
1161         } else {
1162             next_part_sched = -1;
1163         }
1164     }
1165
1166  bail:
1167     VOL_UNLOCK;
1168     return node;
1169 }
1170
1171 static void
1172 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1173 {
1174     afs_int32 partid;
1175     int idx;
1176
1177     DeleteFromPendingQueue(node);
1178     partid = node->partition_id;
1179     if (partid >=0 && partid <= VOLMAXPARTS) {
1180         partition_salvaging[partid]--;
1181     }
1182     if (result == 0) {
1183         node->state = SALVSYNC_STATE_DONE;
1184     } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1185         node->state = SALVSYNC_STATE_ERROR;
1186     }
1187
1188     if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1189         for (idx = 0; idx < VOLMAXTYPES; idx++) {
1190             if (node->volgroup.children[idx]) {
1191                 node->volgroup.children[idx]->state = node->state;
1192             }
1193         }
1194     }
1195 }
1196
1197 void 
1198 SALVSYNC_doneWork(struct SalvageQueueNode * node, int result)
1199 {
1200     VOL_LOCK;
1201     SALVSYNC_doneWork_r(node, result);
1202     VOL_UNLOCK;
1203 }
1204
1205 void
1206 SALVSYNC_doneWorkByPid(int pid, int result)
1207 {
1208     struct SalvageQueueNode * node;
1209
1210     VOL_LOCK;
1211     node = LookupPendingCommandByPid(pid);
1212     if (node != NULL) {
1213         SALVSYNC_doneWork_r(node, result);
1214     }
1215     VOL_UNLOCK;
1216 }
1217
1218 #endif /* AFS_DEMAND_ATTACH_FS */