a3175a2dfc9ef06006bc8443fedcc34d45310869
[openafs.git] / src / vol / volume.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  *
9  * Portions Copyright (c) 2005-2008 Sine Nomine Associates
10  */
11
12 /* 1/1/89: NB:  this stuff is all going to be replaced.  Don't take it too seriously */
13 /*
14
15         System:         VICE-TWO
16         Module:         volume.c
17         Institution:    The Information Technology Center, Carnegie-Mellon University
18
19  */
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 #include <roken.h>
25 #include <afs/opr.h>
26
27 #include <ctype.h>
28 #include <stddef.h>
29
30 #ifdef HAVE_SYS_FILE_H
31 #include <sys/file.h>
32 #endif
33
34 #ifdef AFS_PTHREAD_ENV
35 # include <opr/lock.h>
36 #else
37 # include <opr/lockstub.h>
38 #endif
39 #include <opr/ffs.h>
40 #include <opr/jhash.h>
41
42 #include <afs/afsint.h>
43
44 #include <rx/rx_queue.h>
45
46 #ifndef AFS_NT40_ENV
47 #if !defined(AFS_SGI_ENV)
48 #ifdef  AFS_OSF_ENV
49 #include <ufs/fs.h>
50 #else /* AFS_OSF_ENV */
51 #ifdef AFS_VFSINCL_ENV
52 #define VFS
53 #ifdef  AFS_SUN5_ENV
54 #include <sys/fs/ufs_fs.h>
55 #else
56 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
57 #include <ufs/ufs/dinode.h>
58 #include <ufs/ffs/fs.h>
59 #else
60 #include <ufs/fs.h>
61 #endif
62 #endif
63 #else /* AFS_VFSINCL_ENV */
64 #if !defined(AFS_AIX_ENV) && !defined(AFS_LINUX20_ENV) && !defined(AFS_XBSD_ENV) && !defined(AFS_DARWIN_ENV)
65 #include <sys/fs.h>
66 #endif
67 #endif /* AFS_VFSINCL_ENV */
68 #endif /* AFS_OSF_ENV */
69 #endif /* AFS_SGI_ENV */
70 #endif /* !AFS_NT40_ENV */
71
72 #ifdef  AFS_AIX_ENV
73 #include <sys/vfs.h>
74 #else
75 #ifdef  AFS_HPUX_ENV
76 #include <mntent.h>
77 #else
78 #if     defined(AFS_SUN_ENV) || defined(AFS_SUN5_ENV)
79 #ifdef  AFS_SUN5_ENV
80 #include <sys/mnttab.h>
81 #include <sys/mntent.h>
82 #else
83 #include <mntent.h>
84 #endif
85 #else
86 #ifndef AFS_NT40_ENV
87 #if defined(AFS_SGI_ENV)
88 #include <mntent.h>
89 #else
90 #ifndef AFS_LINUX20_ENV
91 #include <fstab.h>              /* Need to find in libc 5, present in libc 6 */
92 #endif
93 #endif
94 #endif /* AFS_SGI_ENV */
95 #endif
96 #endif /* AFS_HPUX_ENV */
97 #endif
98
99 #include "nfs.h"
100 #include <afs/errors.h>
101 #include "lock.h"
102 #include "lwp.h"
103 #include <afs/afssyscalls.h>
104 #include "ihandle.h"
105 #include <afs/afsutil.h>
106 #include "daemon_com.h"
107 #include "fssync.h"
108 #include "salvsync.h"
109 #include "vnode.h"
110 #include "volume.h"
111 #include "partition.h"
112 #include "volume_inline.h"
113 #include "common.h"
114 #include "vutils.h"
115 #include <afs/dir.h>
116
117 #ifdef AFS_PTHREAD_ENV
118 pthread_mutex_t vol_glock_mutex;
119 pthread_mutex_t vol_trans_mutex;
120 pthread_cond_t vol_put_volume_cond;
121 pthread_cond_t vol_sleep_cond;
122 pthread_cond_t vol_init_attach_cond;
123 pthread_cond_t vol_vinit_cond;
124 int vol_attach_threads = 1;
125 #endif /* AFS_PTHREAD_ENV */
126
127 /* start-time configurable I/O parameters */
128 ih_init_params vol_io_params;
129
130 #ifdef AFS_DEMAND_ATTACH_FS
131 pthread_mutex_t vol_salvsync_mutex;
132
133 /*
134  * Set this to 1 to disallow SALVSYNC communication in all threads; used
135  * during shutdown, since the salvageserver may have gone away.
136  */
137 static volatile sig_atomic_t vol_disallow_salvsync = 0;
138 #endif /* AFS_DEMAND_ATTACH_FS */
139
140 /**
141  * has VShutdown_r been called / is VShutdown_r running?
142  */
143 static int vol_shutting_down = 0;
144
145 #ifdef  AFS_OSF_ENV
146 extern void *calloc(), *realloc();
147 #endif
148
149 /* Forward declarations */
150 static Volume *attach2(Error * ec, VolumeId volumeId, char *path,
151                        struct DiskPartition64 *partp, Volume * vp,
152                        int isbusy, int mode, int *acheckedOut);
153 static void ReallyFreeVolume(Volume * vp);
154 #ifdef AFS_DEMAND_ATTACH_FS
155 static void FreeVolume(Volume * vp);
156 #else /* !AFS_DEMAND_ATTACH_FS */
157 #define FreeVolume(vp) ReallyFreeVolume(vp)
158 static void VScanUpdateList(void);
159 #endif /* !AFS_DEMAND_ATTACH_FS */
160 static void VInitVolumeHeaderCache(afs_uint32 howMany);
161 static int GetVolumeHeader(Volume * vp);
162 static void ReleaseVolumeHeader(struct volHeader *hd);
163 static void FreeVolumeHeader(Volume * vp);
164 static void AddVolumeToHashTable(Volume * vp, VolumeId hashid);
165 static void DeleteVolumeFromHashTable(Volume * vp);
166 #if 0
167 static int VHold(Volume * vp);
168 #endif
169 static int VHold_r(Volume * vp);
170 static void VGetBitmap_r(Error * ec, Volume * vp, VnodeClass class);
171 static void VReleaseVolumeHandles_r(Volume * vp);
172 static void VCloseVolumeHandles_r(Volume * vp);
173 static void LoadVolumeHeader(Error * ec, Volume * vp);
174 static int VCheckOffline(Volume * vp);
175 static int VCheckDetach(Volume * vp);
176 static Volume * GetVolume(Error * ec, Error * client_ec, VolumeId volumeId,
177                           Volume * hint, const struct timespec *ts);
178
179 int LogLevel;                   /* Vice loglevel--not defined as extern so that it will be
180                                  * defined when not linked with vice, XXXX */
181 ProgramType programType;        /* The type of program using the package */
182 static VolumePackageOptions vol_opts;
183
184 /* extended volume package statistics */
185 VolPkgStats VStats;
186
187 #ifdef VOL_LOCK_DEBUG
188 pthread_t vol_glock_holder = 0;
189 #endif
190
191
192 /* this parameter needs to be tunable at runtime.
193  * 128 was really inadequate for largish servers -- at 16384 volumes this
194  * puts average chain length at 128, thus an average 65 deref's to find a volptr.
195  * talk about bad spatial locality...
196  *
197  * an AVL or splay tree might work a lot better, but we'll just increase
198  * the default hash table size for now
199  */
200 #define DEFAULT_VOLUME_HASH_BITS 10
201 #define DEFAULT_VOLUME_HASH_SIZE opr_jhash_size(DEFAULT_VOLUME_HASH_BITS)
202 #define DEFAULT_VOLUME_HASH_MASK opr_jhash_mask(DEFAULT_VOLUME_HASH_BITS)
203 #define VOLUME_HASH(volumeId) \
204     (opr_jhash_int(volumeId, 0) & VolumeHashTable.Mask)
205
206 /*
207  * turn volume hash chains into partially ordered lists.
208  * when the threshold is exceeded between two adjacent elements,
209  * perform a chain rebalancing operation.
210  *
211  * keep the threshold high in order to keep cache line invalidates
212  * low "enough" on SMPs
213  */
214 #define VOLUME_HASH_REORDER_THRESHOLD 200
215
216 /*
217  * when possible, don't just reorder single elements, but reorder
218  * entire chains of elements at once.  a chain of elements that
219  * exceed the element previous to the pivot by at least CHAIN_THRESH
220  * accesses are moved in front of the chain whose elements have at
221  * least CHAIN_THRESH less accesses than the pivot element
222  */
223 #define VOLUME_HASH_REORDER_CHAIN_THRESH (VOLUME_HASH_REORDER_THRESHOLD / 2)
224
225 /*
226  * The per volume uniquifier is bumped by 200 and and written to disk
227  * every 200 file creates.
228  */
229 #define VOLUME_UPDATE_UNIQUIFIER_BUMP 200
230
231 #include "rx/rx_queue.h"
232
233
234 VolumeHashTable_t VolumeHashTable = {
235     DEFAULT_VOLUME_HASH_SIZE,
236     DEFAULT_VOLUME_HASH_MASK,
237     NULL
238 };
239
240
241 static void VInitVolumeHash(void);
242
243
244 #ifdef AFS_PTHREAD_ENV
245 /**
246  * disk partition queue element
247  */
248 typedef struct diskpartition_queue_t {
249     struct rx_queue queue;             /**< queue header */
250     struct DiskPartition64 *diskP;     /**< disk partition table entry */
251 } diskpartition_queue_t;
252
253 #ifndef AFS_DEMAND_ATTACH_FS
254
255 typedef struct vinitvolumepackage_thread_t {
256     struct rx_queue queue;
257     pthread_cond_t thread_done_cv;
258     int n_threads_complete;
259 } vinitvolumepackage_thread_t;
260 static void * VInitVolumePackageThread(void * args);
261
262 #else  /* !AFS_DEMAND_ATTTACH_FS */
263 #define VINIT_BATCH_MAX_SIZE 512
264
265 /**
266  * disk partition work queue
267  */
268 struct partition_queue {
269     struct rx_queue head;              /**< diskpartition_queue_t queue */
270     pthread_mutex_t mutex;
271     pthread_cond_t cv;
272 };
273
274 /**
275  * volumes parameters for preattach
276  */
277 struct volume_init_batch {
278     struct rx_queue queue;               /**< queue header */
279     int thread;                          /**< posting worker thread */
280     int last;                            /**< indicates thread is done */
281     int size;                            /**< number of volume ids in batch */
282     Volume *batch[VINIT_BATCH_MAX_SIZE]; /**< volumes ids to preattach */
283 };
284
285 /**
286  * volume parameters work queue
287  */
288 struct volume_init_queue {
289     struct rx_queue head;                /**< volume_init_batch queue */
290     pthread_mutex_t mutex;
291     pthread_cond_t cv;
292 };
293
294 /**
295  * volume init worker thread parameters
296  */
297 struct vinitvolumepackage_thread_param {
298     int nthreads;                        /**< total number of worker threads */
299     int thread;                          /**< thread number for this worker thread */
300     struct partition_queue *pq;          /**< queue partitions to scan */
301     struct volume_init_queue *vq;        /**< queue of volume to preattach */
302 };
303
304 static void *VInitVolumePackageThread(void *args);
305 static struct DiskPartition64 *VInitNextPartition(struct partition_queue *pq);
306 static VolumeId VInitNextVolumeId(DIR *dirp);
307 static int VInitPreAttachVolumes(int nthreads, struct volume_init_queue *vq);
308
309 #endif /* !AFS_DEMAND_ATTACH_FS */
310 #endif /* AFS_PTHREAD_ENV */
311
312 #ifndef AFS_DEMAND_ATTACH_FS
313 static int VAttachVolumesByPartition(struct DiskPartition64 *diskP,
314                                      int * nAttached, int * nUnattached);
315 #endif /* AFS_DEMAND_ATTACH_FS */
316
317
318 #ifdef AFS_DEMAND_ATTACH_FS
319 /* demand attach fileserver extensions */
320
321 /* XXX
322  * in the future we will support serialization of VLRU state into the fs_state
323  * disk dumps
324  *
325  * these structures are the beginning of that effort
326  */
327 struct VLRU_DiskHeader {
328     struct versionStamp stamp;            /* magic and structure version number */
329     afs_uint32 mtime;                     /* time of dump to disk */
330     afs_uint32 num_records;               /* number of VLRU_DiskEntry records */
331 };
332
333 struct VLRU_DiskEntry {
334     VolumeId vid;                       /* volume ID */
335     afs_uint32 idx;                       /* generation */
336     afs_uint32 last_get;                  /* timestamp of last get */
337 };
338
339 struct VLRU_StartupQueue {
340     struct VLRU_DiskEntry * entry;
341     int num_entries;
342     int next_idx;
343 };
344
345 typedef struct vshutdown_thread_t {
346     struct rx_queue q;
347     pthread_mutex_t lock;
348     pthread_cond_t cv;
349     pthread_cond_t master_cv;
350     int n_threads;
351     int n_threads_complete;
352     int vol_remaining;
353     int schedule_version;
354     int pass;
355     byte n_parts;
356     byte n_parts_done_pass;
357     byte part_thread_target[VOLMAXPARTS+1];
358     byte part_done_pass[VOLMAXPARTS+1];
359     struct rx_queue * part_pass_head[VOLMAXPARTS+1];
360     int stats[4][VOLMAXPARTS+1];
361 } vshutdown_thread_t;
362 static void * VShutdownThread(void * args);
363
364
365 static Volume * VAttachVolumeByVp_r(Error * ec, Volume * vp, int mode);
366 static int VCheckFree(Volume * vp);
367
368 /* VByP List */
369 static void AddVolumeToVByPList_r(Volume * vp);
370 static void DeleteVolumeFromVByPList_r(Volume * vp);
371 static void VVByPListBeginExclusive_r(struct DiskPartition64 * dp);
372 static void VVByPListEndExclusive_r(struct DiskPartition64 * dp);
373 static void VVByPListWait_r(struct DiskPartition64 * dp);
374
375 /* online salvager */
376 typedef enum {
377     VCHECK_SALVAGE_OK = 0,         /**< no pending salvage */
378     VCHECK_SALVAGE_SCHEDULED = 1,  /**< salvage has been scheduled */
379     VCHECK_SALVAGE_ASYNC = 2,      /**< salvage being scheduled */
380     VCHECK_SALVAGE_DENIED = 3,     /**< salvage not scheduled; denied */
381     VCHECK_SALVAGE_FAIL = 4        /**< salvage not scheduled; failed */
382 } vsalvage_check;
383 static int VCheckSalvage(Volume * vp);
384 #if defined(SALVSYNC_BUILD_CLIENT) || defined(FSSYNC_BUILD_CLIENT)
385 static int VScheduleSalvage_r(Volume * vp);
386 #endif
387
388 /* Volume hash table */
389 static void VReorderHash_r(VolumeHashChainHead * head, Volume * pp, Volume * vp);
390 static void VHashBeginExclusive_r(VolumeHashChainHead * head);
391 static void VHashEndExclusive_r(VolumeHashChainHead * head);
392 static void VHashWait_r(VolumeHashChainHead * head);
393
394 /* shutdown */
395 static int ShutdownVByPForPass_r(struct DiskPartition64 * dp, int pass);
396 static int ShutdownVolumeWalk_r(struct DiskPartition64 * dp, int pass,
397                                 struct rx_queue ** idx);
398 static void ShutdownController(vshutdown_thread_t * params);
399 static void ShutdownCreateSchedule(vshutdown_thread_t * params);
400
401 /* VLRU */
402 static void VLRU_ComputeConstants(void);
403 static void VInitVLRU(void);
404 static void VLRU_Init_Node_r(Volume * vp);
405 static void VLRU_Add_r(Volume * vp);
406 static void VLRU_Delete_r(Volume * vp);
407 static void VLRU_UpdateAccess_r(Volume * vp);
408 static void * VLRU_ScannerThread(void * args);
409 static void VLRU_Scan_r(int idx);
410 static void VLRU_Promote_r(int idx);
411 static void VLRU_Demote_r(int idx);
412 static void VLRU_SwitchQueues(Volume * vp, int new_idx, int append);
413
414 /* soft detach */
415 static int VCheckSoftDetach(Volume * vp, afs_uint32 thresh);
416 static int VCheckSoftDetachCandidate(Volume * vp, afs_uint32 thresh);
417 static int VSoftDetachVolume_r(Volume * vp, afs_uint32 thresh);
418
419
420 pthread_key_t VThread_key;
421 VThreadOptions_t VThread_defaults = {
422     0                           /**< allow salvsync */
423 };
424 #endif /* AFS_DEMAND_ATTACH_FS */
425
426
427 struct Lock vol_listLock;       /* Lock obtained when listing volumes:
428                                  * prevents a volume from being missed
429                                  * if the volume is attached during a
430                                  * list volumes */
431
432
433 /* Common message used when the volume goes off line */
434 char *VSalvageMessage =
435     "Files in this volume are currently unavailable; call operations";
436
437 int VInit;                      /* 0 - uninitialized,
438                                  * 1 - initialized but not all volumes have been attached,
439                                  * 2 - initialized and all volumes have been attached,
440                                  * 3 - initialized, all volumes have been attached, and
441                                  * VConnectFS() has completed. */
442
443 static int vinit_attach_abort = 0;
444
445 bit32 VolumeCacheCheck;         /* Incremented everytime a volume goes on line--
446                                  * used to stamp volume headers and in-core
447                                  * vnodes.  When the volume goes on-line the
448                                  * vnode will be invalidated
449                                  * access only with VOL_LOCK held */
450
451
452
453
454 /***************************************************/
455 /* Startup routines                                */
456 /***************************************************/
457
458 #if defined(FAST_RESTART) && defined(AFS_DEMAND_ATTACH_FS)
459 # error FAST_RESTART and DAFS are incompatible. For the DAFS equivalent \
460         of FAST_RESTART, use the -unsafe-nosalvage fileserver argument
461 #endif
462
463 /**
464  * assign default values to a VolumePackageOptions struct.
465  *
466  * Always call this on a VolumePackageOptions struct first, then set any
467  * specific options you want, then call VInitVolumePackage2.
468  *
469  * @param[in]  pt   caller's program type
470  * @param[out] opts volume package options
471  */
472 void
473 VOptDefaults(ProgramType pt, VolumePackageOptions *opts)
474 {
475     opts->nLargeVnodes = opts->nSmallVnodes = 5;
476     opts->volcache = 0;
477
478     opts->canScheduleSalvage = 0;
479     opts->canUseFSSYNC = 0;
480     opts->canUseSALVSYNC = 0;
481
482     opts->interrupt_rxcall = NULL;
483     opts->offline_timeout = -1;
484     opts->offline_shutdown_timeout = -1;
485     opts->usage_threshold = 128;
486     opts->usage_rate_limit = 5;
487
488 #ifdef FAST_RESTART
489     opts->unsafe_attach = 1;
490 #else /* !FAST_RESTART */
491     opts->unsafe_attach = 0;
492 #endif /* !FAST_RESTART */
493
494     switch (pt) {
495     case fileServer:
496         opts->canScheduleSalvage = 1;
497         opts->canUseSALVSYNC = 1;
498         break;
499
500     case salvageServer:
501         opts->canUseFSSYNC = 1;
502         break;
503
504     case volumeServer:
505         opts->nLargeVnodes = 0;
506         opts->nSmallVnodes = 0;
507
508         opts->canScheduleSalvage = 1;
509         opts->canUseFSSYNC = 1;
510         break;
511
512     default:
513         /* noop */
514         break;
515     }
516 }
517
518 /**
519  * Set VInit to a certain value, and signal waiters.
520  *
521  * @param[in] value  the value to set VInit to
522  *
523  * @pre VOL_LOCK held
524  */
525 static void
526 VSetVInit_r(int value)
527 {
528     VInit = value;
529     opr_cv_broadcast(&vol_vinit_cond);
530 }
531
532 static_inline void
533 VLogOfflineTimeout(const char *type, afs_int32 timeout)
534 {
535     if (timeout < 0) {
536         return;
537     }
538     if (timeout == 0) {
539         Log("VInitVolumePackage: Interrupting clients accessing %s "
540             "immediately\n", type);
541     } else {
542         Log("VInitVolumePackage: Interrupting clients accessing %s "
543             "after %ld second%s\n", type, (long)timeout, timeout==1?"":"s");
544     }
545 }
546
547 int
548 VInitVolumePackage2(ProgramType pt, VolumePackageOptions * opts)
549 {
550     int errors = 0;             /* Number of errors while finding vice partitions. */
551
552     programType = pt;
553     vol_opts = *opts;
554
555 #ifndef AFS_PTHREAD_ENV
556     if (opts->offline_timeout != -1 || opts->offline_shutdown_timeout != -1) {
557         Log("VInitVolumePackage: offline_timeout and/or "
558             "offline_shutdown_timeout was specified, but the volume package "
559             "does not support these for LWP builds\n");
560         return -1;
561     }
562 #endif
563     VLogOfflineTimeout("volumes going offline", opts->offline_timeout);
564     VLogOfflineTimeout("volumes going offline during shutdown",
565                        opts->offline_shutdown_timeout);
566
567     memset(&VStats, 0, sizeof(VStats));
568     VStats.hdr_cache_size = 200;
569
570     VInitPartitionPackage();
571     VInitVolumeHash();
572 #ifdef AFS_DEMAND_ATTACH_FS
573     if (programType == fileServer) {
574         VInitVLRU();
575     } else {
576         VLRU_SetOptions(VLRU_SET_ENABLED, 0);
577     }
578     opr_Verify(pthread_key_create(&VThread_key, NULL) == 0);
579 #endif
580
581     opr_mutex_init(&vol_glock_mutex);
582     opr_mutex_init(&vol_trans_mutex);
583     opr_cv_init(&vol_put_volume_cond);
584     opr_cv_init(&vol_sleep_cond);
585     opr_cv_init(&vol_init_attach_cond);
586     opr_cv_init(&vol_vinit_cond);
587 #ifndef AFS_PTHREAD_ENV
588     IOMGR_Initialize();
589 #endif /* AFS_PTHREAD_ENV */
590     Lock_Init(&vol_listLock);
591
592     srandom(time(0));           /* For VGetVolumeInfo */
593
594 #ifdef AFS_DEMAND_ATTACH_FS
595     opr_mutex_init(&vol_salvsync_mutex);
596 #endif /* AFS_DEMAND_ATTACH_FS */
597
598     /* Ok, we have done enough initialization that fileserver can
599      * start accepting calls, even though the volumes may not be
600      * available just yet.
601      */
602     VInit = 1;
603
604 #if defined(AFS_DEMAND_ATTACH_FS) && defined(SALVSYNC_BUILD_SERVER)
605     if (programType == salvageServer) {
606         SALVSYNC_salvInit();
607     }
608 #endif /* AFS_DEMAND_ATTACH_FS */
609 #ifdef FSSYNC_BUILD_SERVER
610     if (programType == fileServer) {
611         FSYNC_fsInit();
612     }
613 #endif
614 #if defined(AFS_DEMAND_ATTACH_FS) && defined(SALVSYNC_BUILD_CLIENT)
615     if (VCanUseSALVSYNC()) {
616         /* establish a connection to the salvager at this point */
617         opr_Verify(VConnectSALV() != 0);
618     }
619 #endif /* AFS_DEMAND_ATTACH_FS */
620
621     if (opts->volcache > VStats.hdr_cache_size)
622         VStats.hdr_cache_size = opts->volcache;
623     VInitVolumeHeaderCache(VStats.hdr_cache_size);
624
625     VInitVnodes(vLarge, opts->nLargeVnodes);
626     VInitVnodes(vSmall, opts->nSmallVnodes);
627
628
629     errors = VAttachPartitions();
630     if (errors)
631         return -1;
632
633     if (programType != fileServer) {
634         errors = VInitAttachVolumes(programType);
635         if (errors) {
636             return -1;
637         }
638     }
639
640 #ifdef FSSYNC_BUILD_CLIENT
641     if (VCanUseFSSYNC()) {
642         if (!VConnectFS()) {
643 #ifdef AFS_DEMAND_ATTACH_FS
644             if (programType == salvageServer) {
645                 Log("Unable to connect to file server; aborted\n");
646                 exit(1);
647             }
648 #endif /* AFS_DEMAND_ATTACH_FS */
649             Log("Unable to connect to file server; will retry at need\n");
650         }
651     }
652 #endif /* FSSYNC_BUILD_CLIENT */
653     return 0;
654 }
655
656
657 #if !defined(AFS_PTHREAD_ENV)
658 /**
659  * Attach volumes in vice partitions
660  *
661  * @param[in]  pt         calling program type
662  *
663  * @return 0
664  * @note This is the original, non-threaded version of attach parititions.
665  *
666  * @post VInit state is 2
667  */
668 int
669 VInitAttachVolumes(ProgramType pt)
670 {
671     opr_Assert(VInit==1);
672     if (pt == fileServer) {
673         struct DiskPartition64 *diskP;
674         /* Attach all the volumes in this partition */
675         for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
676             int nAttached = 0, nUnattached = 0;
677             opr_Verify(VAttachVolumesByPartition(diskP,
678                                                  &nAttached, &nUnattached)
679                             == 0);
680         }
681     }
682     VOL_LOCK;
683     VSetVInit_r(2);                     /* Initialized, and all volumes have been attached */
684     LWP_NoYieldSignal(VInitAttachVolumes);
685     VOL_UNLOCK;
686     return 0;
687 }
688 #endif /* !AFS_PTHREAD_ENV */
689
690 #if defined(AFS_PTHREAD_ENV) && !defined(AFS_DEMAND_ATTACH_FS)
691 /**
692  * Attach volumes in vice partitions
693  *
694  * @param[in]  pt         calling program type
695  *
696  * @return 0
697  * @note Threaded version of attach parititions.
698  *
699  * @post VInit state is 2
700  */
701 int
702 VInitAttachVolumes(ProgramType pt)
703 {
704     opr_Assert(VInit==1);
705     if (pt == fileServer) {
706         struct DiskPartition64 *diskP;
707         struct vinitvolumepackage_thread_t params;
708         struct diskpartition_queue_t * dpq;
709         int i, threads, parts;
710         pthread_t tid;
711         pthread_attr_t attrs;
712
713         opr_cv_init(&params.thread_done_cv);
714         queue_Init(&params);
715         params.n_threads_complete = 0;
716
717         /* create partition work queue */
718         for (parts=0, diskP = DiskPartitionList; diskP; diskP = diskP->next, parts++) {
719             dpq = malloc(sizeof(struct diskpartition_queue_t));
720             opr_Assert(dpq != NULL);
721             dpq->diskP = diskP;
722             queue_Append(&params,dpq);
723         }
724
725         threads = min(parts, vol_attach_threads);
726
727         if (threads > 1) {
728             /* spawn off a bunch of initialization threads */
729             opr_Verify(pthread_attr_init(&attrs) == 0);
730             opr_Verify(pthread_attr_setdetachstate(&attrs,
731                                                    PTHREAD_CREATE_DETACHED)
732                             == 0);
733
734             Log("VInitVolumePackage: beginning parallel fileserver startup\n");
735             Log("VInitVolumePackage: using %d threads to attach volumes on %d partitions\n",
736                 threads, parts);
737
738             VOL_LOCK;
739             for (i=0; i < threads; i++) {
740                 AFS_SIGSET_DECL;
741                 AFS_SIGSET_CLEAR();
742                 opr_Verify(pthread_create(&tid, &attrs,
743                                           &VInitVolumePackageThread,
744                                           &params) == 0);
745                 AFS_SIGSET_RESTORE();
746             }
747
748             while(params.n_threads_complete < threads) {
749                 VOL_CV_WAIT(&params.thread_done_cv);
750             }
751             VOL_UNLOCK;
752
753             opr_Verify(pthread_attr_destroy(&attrs) == 0);
754         } else {
755             /* if we're only going to run one init thread, don't bother creating
756              * another LWP */
757             Log("VInitVolumePackage: beginning single-threaded fileserver startup\n");
758             Log("VInitVolumePackage: using 1 thread to attach volumes on %d partition(s)\n",
759                 parts);
760
761             VInitVolumePackageThread(&params);
762         }
763
764         opr_cv_destroy(&params.thread_done_cv);
765     }
766     VOL_LOCK;
767     VSetVInit_r(2);                     /* Initialized, and all volumes have been attached */
768     opr_cv_broadcast(&vol_init_attach_cond);
769     VOL_UNLOCK;
770     return 0;
771 }
772
773 static void *
774 VInitVolumePackageThread(void * args) {
775
776     struct DiskPartition64 *diskP;
777     struct vinitvolumepackage_thread_t * params;
778     struct diskpartition_queue_t * dpq;
779
780     params = (vinitvolumepackage_thread_t *) args;
781
782
783     VOL_LOCK;
784     /* Attach all the volumes in this partition */
785     while (queue_IsNotEmpty(params)) {
786         int nAttached = 0, nUnattached = 0;
787
788         if (vinit_attach_abort) {
789             Log("Aborting initialization\n");
790             goto done;
791         }
792
793         dpq = queue_First(params,diskpartition_queue_t);
794         queue_Remove(dpq);
795         VOL_UNLOCK;
796         diskP = dpq->diskP;
797         free(dpq);
798
799         opr_Verify(VAttachVolumesByPartition(diskP, &nAttached,
800                                              &nUnattached) == 0);
801
802         VOL_LOCK;
803     }
804
805 done:
806     params->n_threads_complete++;
807     opr_cv_signal(&params->thread_done_cv);
808     VOL_UNLOCK;
809     return NULL;
810 }
811 #endif /* AFS_PTHREAD_ENV && !AFS_DEMAND_ATTACH_FS */
812
813 #if defined(AFS_DEMAND_ATTACH_FS)
814 /**
815  * Attach volumes in vice partitions
816  *
817  * @param[in]  pt         calling program type
818  *
819  * @return 0
820  * @note Threaded version of attach partitions.
821  *
822  * @post VInit state is 2
823  */
824 int
825 VInitAttachVolumes(ProgramType pt)
826 {
827     opr_Assert(VInit==1);
828     if (pt == fileServer) {
829
830         struct DiskPartition64 *diskP;
831         struct partition_queue pq;
832         struct volume_init_queue vq;
833
834         int i, threads, parts;
835         pthread_t tid;
836         pthread_attr_t attrs;
837
838         /* create partition work queue */
839         queue_Init(&pq);
840         opr_cv_init(&pq.cv);
841         opr_mutex_init(&pq.mutex);
842         for (parts = 0, diskP = DiskPartitionList; diskP; diskP = diskP->next, parts++) {
843             struct diskpartition_queue_t *dp;
844             dp = malloc(sizeof(struct diskpartition_queue_t));
845             opr_Assert(dp != NULL);
846             dp->diskP = diskP;
847             queue_Append(&pq, dp);
848         }
849
850         /* number of worker threads; at least one, not to exceed the number of partitions */
851         threads = min(parts, vol_attach_threads);
852
853         /* create volume work queue */
854         queue_Init(&vq);
855         opr_cv_init(&vq.cv);
856         opr_mutex_init(&vq.mutex);
857
858         opr_Verify(pthread_attr_init(&attrs) == 0);
859         opr_Verify(pthread_attr_setdetachstate(&attrs,
860                                                PTHREAD_CREATE_DETACHED) == 0);
861
862         Log("VInitVolumePackage: beginning parallel fileserver startup\n");
863         Log("VInitVolumePackage: using %d threads to pre-attach volumes on %d partitions\n",
864                 threads, parts);
865
866         /* create threads to scan disk partitions. */
867         for (i=0; i < threads; i++) {
868             struct vinitvolumepackage_thread_param *params;
869             AFS_SIGSET_DECL;
870
871             params = malloc(sizeof(struct vinitvolumepackage_thread_param));
872             opr_Assert(params);
873             params->pq = &pq;
874             params->vq = &vq;
875             params->nthreads = threads;
876             params->thread = i+1;
877
878             AFS_SIGSET_CLEAR();
879             opr_Verify(pthread_create(&tid, &attrs,
880                                       &VInitVolumePackageThread,
881                                       (void*)params) == 0);
882             AFS_SIGSET_RESTORE();
883         }
884
885         VInitPreAttachVolumes(threads, &vq);
886
887         opr_Verify(pthread_attr_destroy(&attrs) == 0);
888         opr_cv_destroy(&pq.cv);
889         opr_mutex_destroy(&pq.mutex);
890         opr_cv_destroy(&vq.cv);
891         opr_mutex_destroy(&vq.mutex);
892     }
893
894     VOL_LOCK;
895     VSetVInit_r(2);                     /* Initialized, and all volumes have been attached */
896     opr_cv_broadcast(&vol_init_attach_cond);
897     VOL_UNLOCK;
898
899     return 0;
900 }
901
902 /**
903  * Volume package initialization worker thread. Scan partitions for volume
904  * header files. Gather batches of volume ids and dispatch them to
905  * the main thread to be preattached.  The volume preattachement is done
906  * in the main thread to avoid global volume lock contention.
907  */
908 static void *
909 VInitVolumePackageThread(void *args)
910 {
911     struct vinitvolumepackage_thread_param *params;
912     struct DiskPartition64 *partition;
913     struct partition_queue *pq;
914     struct volume_init_queue *vq;
915     struct volume_init_batch *vb;
916
917     opr_Assert(args);
918     params = (struct vinitvolumepackage_thread_param *)args;
919     pq = params->pq;
920     vq = params->vq;
921     opr_Assert(pq);
922     opr_Assert(vq);
923
924     vb = malloc(sizeof(struct volume_init_batch));
925     opr_Assert(vb);
926     vb->thread = params->thread;
927     vb->last = 0;
928     vb->size = 0;
929
930     Log("Scanning partitions on thread %d of %d\n", params->thread, params->nthreads);
931     while((partition = VInitNextPartition(pq))) {
932         DIR *dirp;
933         VolumeId vid;
934
935         Log("Partition %s: pre-attaching volumes\n", partition->name);
936         dirp = opendir(VPartitionPath(partition));
937         if (!dirp) {
938             Log("opendir on Partition %s failed, errno=%d!\n", partition->name, errno);
939             continue;
940         }
941         while ((vid = VInitNextVolumeId(dirp))) {
942             Volume *vp = calloc(1, sizeof(Volume));
943             opr_Assert(vp);
944             vp->device = partition->device;
945             vp->partition = partition;
946             vp->hashid = vid;
947             queue_Init(&vp->vnode_list);
948             queue_Init(&vp->rx_call_list);
949             opr_cv_init(&V_attachCV(vp));
950
951             vb->batch[vb->size++] = vp;
952             if (vb->size == VINIT_BATCH_MAX_SIZE) {
953                 opr_mutex_enter(&vq->mutex);
954                 queue_Append(vq, vb);
955                 opr_cv_broadcast(&vq->cv);
956                 opr_mutex_exit(&vq->mutex);
957
958                 vb = malloc(sizeof(struct volume_init_batch));
959                 opr_Assert(vb);
960                 vb->thread = params->thread;
961                 vb->size = 0;
962                 vb->last = 0;
963             }
964         }
965         closedir(dirp);
966     }
967
968     vb->last = 1;
969     opr_mutex_enter(&vq->mutex);
970     queue_Append(vq, vb);
971     opr_cv_broadcast(&vq->cv);
972     opr_mutex_exit(&vq->mutex);
973
974     Log("Partition scan thread %d of %d ended\n", params->thread, params->nthreads);
975     free(params);
976     return NULL;
977 }
978
979 /**
980  * Read next element from the pre-populated partition list.
981  */
982 static struct DiskPartition64*
983 VInitNextPartition(struct partition_queue *pq)
984 {
985     struct DiskPartition64 *partition;
986     struct diskpartition_queue_t *dp; /* queue element */
987
988     if (vinit_attach_abort) {
989         Log("Aborting volume preattach thread.\n");
990         return NULL;
991     }
992
993     /* get next partition to scan */
994     opr_mutex_enter(&pq->mutex);
995     if (queue_IsEmpty(pq)) {
996         opr_mutex_exit(&pq->mutex);
997         return NULL;
998     }
999     dp = queue_First(pq, diskpartition_queue_t);
1000     queue_Remove(dp);
1001     opr_mutex_exit(&pq->mutex);
1002
1003     opr_Assert(dp);
1004     opr_Assert(dp->diskP);
1005
1006     partition = dp->diskP;
1007     free(dp);
1008     return partition;
1009 }
1010
1011 /**
1012  * Find next volume id on the partition.
1013  */
1014 static VolumeId
1015 VInitNextVolumeId(DIR *dirp)
1016 {
1017     struct dirent *d;
1018     VolumeId vid = 0;
1019     char *ext;
1020
1021     while((d = readdir(dirp))) {
1022         if (vinit_attach_abort) {
1023             Log("Aborting volume preattach thread.\n");
1024             break;
1025         }
1026         ext = strrchr(d->d_name, '.');
1027         if (d->d_name[0] == 'V' && ext && strcmp(ext, VHDREXT) == 0) {
1028             vid = VolumeNumber(d->d_name);
1029             if (vid) {
1030                break;
1031             }
1032             Log("Warning: bogus volume header file: %s\n", d->d_name);
1033         }
1034     }
1035     return vid;
1036 }
1037
1038 /**
1039  * Preattach volumes in batches to avoid lock contention.
1040  */
1041 static int
1042 VInitPreAttachVolumes(int nthreads, struct volume_init_queue *vq)
1043 {
1044     struct volume_init_batch *vb;
1045     int i;
1046
1047     while (nthreads) {
1048         /* dequeue next volume */
1049         opr_mutex_enter(&vq->mutex);
1050         if (queue_IsEmpty(vq)) {
1051             opr_cv_wait(&vq->cv, &vq->mutex);
1052         }
1053         vb = queue_First(vq, volume_init_batch);
1054         queue_Remove(vb);
1055         opr_mutex_exit(&vq->mutex);
1056
1057         if (vb->size) {
1058             VOL_LOCK;
1059             for (i = 0; i<vb->size; i++) {
1060                 Volume *vp;
1061                 Volume *dup;
1062                 Error ec = 0;
1063
1064                 vp = vb->batch[i];
1065                 dup = VLookupVolume_r(&ec, vp->hashid, NULL);
1066                 if (ec) {
1067                     Log("Error looking up volume, code=%d\n", ec);
1068                 }
1069                 else if (dup) {
1070                     Log("Warning: Duplicate volume id %" AFS_VOLID_FMT " detected.\n", afs_printable_VolumeId_lu(vp->hashid));
1071                 }
1072                 else {
1073                     /* put pre-attached volume onto the hash table
1074                      * and bring it up to the pre-attached state */
1075                     AddVolumeToHashTable(vp, vp->hashid);
1076                     AddVolumeToVByPList_r(vp);
1077                     VLRU_Init_Node_r(vp);
1078                     VChangeState_r(vp, VOL_STATE_PREATTACHED);
1079                 }
1080             }
1081             VOL_UNLOCK;
1082         }
1083
1084         if (vb->last) {
1085             nthreads--;
1086         }
1087         free(vb);
1088     }
1089     return 0;
1090 }
1091 #endif /* AFS_DEMAND_ATTACH_FS */
1092
1093 #if !defined(AFS_DEMAND_ATTACH_FS)
1094 /*
1095  * attach all volumes on a given disk partition
1096  */
1097 static int
1098 VAttachVolumesByPartition(struct DiskPartition64 *diskP, int * nAttached, int * nUnattached)
1099 {
1100   DIR * dirp;
1101   struct dirent * dp;
1102   int ret = 0;
1103
1104   Log("Partition %s: attaching volumes\n", diskP->name);
1105   dirp = opendir(VPartitionPath(diskP));
1106   if (!dirp) {
1107     Log("opendir on Partition %s failed!\n", diskP->name);
1108     return 1;
1109   }
1110
1111   while ((dp = readdir(dirp))) {
1112     char *p;
1113     p = strrchr(dp->d_name, '.');
1114
1115     if (vinit_attach_abort) {
1116       Log("Partition %s: abort attach volumes\n", diskP->name);
1117       goto done;
1118     }
1119
1120     if (p != NULL && strcmp(p, VHDREXT) == 0) {
1121       Error error;
1122       Volume *vp;
1123       vp = VAttachVolumeByName(&error, diskP->name, dp->d_name,
1124                                V_VOLUPD);
1125       (*(vp ? nAttached : nUnattached))++;
1126       if (error == VOFFLINE)
1127         Log("Volume %d stays offline (/vice/offline/%s exists)\n", VolumeNumber(dp->d_name), dp->d_name);
1128       else if (LogLevel >= 5) {
1129         Log("Partition %s: attached volume %d (%s)\n",
1130             diskP->name, VolumeNumber(dp->d_name),
1131             dp->d_name);
1132       }
1133       if (vp) {
1134         VPutVolume(vp);
1135       }
1136     }
1137   }
1138
1139   Log("Partition %s: attached %d volumes; %d volumes not attached\n", diskP->name, *nAttached, *nUnattached);
1140 done:
1141   closedir(dirp);
1142   return ret;
1143 }
1144 #endif /* !AFS_DEMAND_ATTACH_FS */
1145
1146 /***************************************************/
1147 /* Shutdown routines                               */
1148 /***************************************************/
1149
1150 /*
1151  * demand attach fs
1152  * highly multithreaded volume package shutdown
1153  *
1154  * with the demand attach fileserver extensions,
1155  * VShutdown has been modified to be multithreaded.
1156  * In order to achieve optimal use of many threads,
1157  * the shutdown code involves one control thread and
1158  * n shutdown worker threads.  The control thread
1159  * periodically examines the number of volumes available
1160  * for shutdown on each partition, and produces a worker
1161  * thread allocation schedule.  The idea is to eliminate
1162  * redundant scheduling computation on the workers by
1163  * having a single master scheduler.
1164  *
1165  * The scheduler's objectives are:
1166  * (1) fairness
1167  *   each partition with volumes remaining gets allocated
1168  *   at least 1 thread (assuming sufficient threads)
1169  * (2) performance
1170  *   threads are allocated proportional to the number of
1171  *   volumes remaining to be offlined.  This ensures that
1172  *   the OS I/O scheduler has many requests to elevator
1173  *   seek on partitions that will (presumably) take the
1174  *   longest amount of time (from now) to finish shutdown
1175  * (3) keep threads busy
1176  *   when there are extra threads, they are assigned to
1177  *   partitions using a simple round-robin algorithm
1178  *
1179  * In the future, we may wish to add the ability to adapt
1180  * to the relative performance patterns of each disk
1181  * partition.
1182  *
1183  *
1184  * demand attach fs
1185  * multi-step shutdown process
1186  *
1187  * demand attach shutdown is a four-step process. Each
1188  * shutdown "pass" shuts down increasingly more difficult
1189  * volumes.  The main purpose is to achieve better cache
1190  * utilization during shutdown.
1191  *
1192  * pass 0
1193  *   shutdown volumes in the unattached, pre-attached
1194  *   and error states
1195  * pass 1
1196  *   shutdown attached volumes with cached volume headers
1197  * pass 2
1198  *   shutdown all volumes in non-exclusive states
1199  * pass 3
1200  *   shutdown all remaining volumes
1201  */
1202
1203 #ifdef AFS_DEMAND_ATTACH_FS
1204
1205 void
1206 VShutdown_r(void)
1207 {
1208     int i;
1209     struct DiskPartition64 * diskP;
1210     struct diskpartition_queue_t * dpq;
1211     vshutdown_thread_t params;
1212     pthread_t tid;
1213     pthread_attr_t attrs;
1214
1215     memset(&params, 0, sizeof(vshutdown_thread_t));
1216
1217     if (VInit < 2) {
1218         Log("VShutdown:  aborting attach volumes\n");
1219         vinit_attach_abort = 1;
1220         VOL_CV_WAIT(&vol_init_attach_cond);
1221     }
1222
1223     for (params.n_parts=0, diskP = DiskPartitionList;
1224          diskP; diskP = diskP->next, params.n_parts++);
1225
1226     Log("VShutdown:  shutting down on-line volumes on %d partition%s...\n",
1227         params.n_parts, params.n_parts > 1 ? "s" : "");
1228
1229     vol_shutting_down = 1;
1230
1231     if (vol_attach_threads > 1) {
1232         /* prepare for parallel shutdown */
1233         params.n_threads = vol_attach_threads;
1234         opr_mutex_init(&params.lock);
1235         opr_cv_init(&params.cv);
1236         opr_cv_init(&params.master_cv);
1237         opr_Verify(pthread_attr_init(&attrs) == 0);
1238         opr_Verify(pthread_attr_setdetachstate(&attrs,
1239                                                PTHREAD_CREATE_DETACHED) == 0);
1240         queue_Init(&params);
1241
1242         /* setup the basic partition information structures for
1243          * parallel shutdown */
1244         for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
1245             /* XXX debug */
1246             struct rx_queue * qp, * nqp;
1247             Volume * vp;
1248             int count = 0;
1249
1250             VVByPListWait_r(diskP);
1251             VVByPListBeginExclusive_r(diskP);
1252
1253             /* XXX debug */
1254             for (queue_Scan(&diskP->vol_list, qp, nqp, rx_queue)) {
1255                 vp = (Volume *)((char *)qp - offsetof(Volume, vol_list));
1256                 if (vp->header)
1257                     count++;
1258             }
1259             Log("VShutdown: partition %s has %d volumes with attached headers\n",
1260                 VPartitionPath(diskP), count);
1261
1262
1263             /* build up the pass 0 shutdown work queue */
1264             dpq = malloc(sizeof(struct diskpartition_queue_t));
1265             opr_Assert(dpq != NULL);
1266             dpq->diskP = diskP;
1267             queue_Prepend(&params, dpq);
1268
1269             params.part_pass_head[diskP->index] = queue_First(&diskP->vol_list, rx_queue);
1270         }
1271
1272         Log("VShutdown:  beginning parallel fileserver shutdown\n");
1273         Log("VShutdown:  using %d threads to offline volumes on %d partition%s\n",
1274             vol_attach_threads, params.n_parts, params.n_parts > 1 ? "s" : "" );
1275
1276         /* do pass 0 shutdown */
1277         opr_mutex_enter(&params.lock);
1278         for (i=0; i < params.n_threads; i++) {
1279             opr_Verify(pthread_create(&tid, &attrs, &VShutdownThread,
1280                                       &params) == 0);
1281         }
1282
1283         /* wait for all the pass 0 shutdowns to complete */
1284         while (params.n_threads_complete < params.n_threads) {
1285             CV_WAIT(&params.master_cv, &params.lock);
1286         }
1287         params.n_threads_complete = 0;
1288         params.pass = 1;
1289         opr_cv_broadcast(&params.cv);
1290         opr_mutex_exit(&params.lock);
1291
1292         Log("VShutdown:  pass 0 completed using the 1 thread per partition algorithm\n");
1293         Log("VShutdown:  starting passes 1 through 3 using finely-granular mp-fast algorithm\n");
1294
1295         /* run the parallel shutdown scheduler. it will drop the glock internally */
1296         ShutdownController(&params);
1297
1298         /* wait for all the workers to finish pass 3 and terminate */
1299         while (params.pass < 4) {
1300             VOL_CV_WAIT(&params.cv);
1301         }
1302
1303         opr_Verify(pthread_attr_destroy(&attrs) == 0);
1304         opr_cv_destroy(&params.cv);
1305         opr_cv_destroy(&params.master_cv);
1306         opr_mutex_destroy(&params.lock);
1307
1308         /* drop the VByPList exclusive reservations */
1309         for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
1310             VVByPListEndExclusive_r(diskP);
1311             Log("VShutdown:  %s stats : (pass[0]=%d, pass[1]=%d, pass[2]=%d, pass[3]=%d)\n",
1312                 VPartitionPath(diskP),
1313                 params.stats[0][diskP->index],
1314                 params.stats[1][diskP->index],
1315                 params.stats[2][diskP->index],
1316                 params.stats[3][diskP->index]);
1317         }
1318
1319         Log("VShutdown:  shutdown finished using %d threads\n", params.n_threads);
1320     } else {
1321         /* if we're only going to run one shutdown thread, don't bother creating
1322          * another LWP */
1323         Log("VShutdown:  beginning single-threaded fileserver shutdown\n");
1324
1325         for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
1326             VShutdownByPartition_r(diskP);
1327         }
1328     }
1329
1330     Log("VShutdown:  complete.\n");
1331 }
1332
1333 #else /* AFS_DEMAND_ATTACH_FS */
1334
1335 void
1336 VShutdown_r(void)
1337 {
1338     int i;
1339     Volume *vp, *np;
1340     afs_int32 code;
1341
1342     if (VInit < 2) {
1343         Log("VShutdown:  aborting attach volumes\n");
1344         vinit_attach_abort = 1;
1345 #ifdef AFS_PTHREAD_ENV
1346         VOL_CV_WAIT(&vol_init_attach_cond);
1347 #else
1348         LWP_WaitProcess(VInitAttachVolumes);
1349 #endif /* AFS_PTHREAD_ENV */
1350     }
1351
1352     Log("VShutdown:  shutting down on-line volumes...\n");
1353     vol_shutting_down = 1;
1354     for (i = 0; i < VolumeHashTable.Size; i++) {
1355         /* try to hold first volume in the hash table */
1356         for (queue_Scan(&VolumeHashTable.Table[i],vp,np,Volume)) {
1357             code = VHold_r(vp);
1358             if (code == 0) {
1359                 if (LogLevel >= 5)
1360                     Log("VShutdown:  Attempting to take volume %" AFS_VOLID_FMT " offline.\n",
1361                         afs_printable_VolumeId_lu(vp->hashid));
1362
1363                 /* next, take the volume offline (drops reference count) */
1364                 VOffline_r(vp, "File server was shut down");
1365             }
1366         }
1367     }
1368     Log("VShutdown:  complete.\n");
1369 }
1370 #endif /* AFS_DEMAND_ATTACH_FS */
1371
1372
1373 void
1374 VShutdown(void)
1375 {
1376     opr_Assert(VInit>0);
1377     VOL_LOCK;
1378     VShutdown_r();
1379     VOL_UNLOCK;
1380 }
1381
1382 /**
1383  * stop new activity (e.g. SALVSYNC) from occurring
1384  *
1385  * Use this to make the volume package less busy; for example, during
1386  * shutdown. This doesn't actually shutdown/detach anything in the
1387  * volume package, but prevents certain processes from ocurring. For
1388  * example, preventing new SALVSYNC communication in DAFS. In theory, we
1389  * could also use this to prevent new volume attachment, or prevent
1390  * other programs from checking out volumes, etc.
1391  */
1392 void
1393 VSetTranquil(void)
1394 {
1395 #ifdef AFS_DEMAND_ATTACH_FS
1396     /* make sure we don't try to contact the salvageserver, since it may
1397      * not be around anymore */
1398     vol_disallow_salvsync = 1;
1399 #endif
1400 }
1401
1402 #ifdef AFS_DEMAND_ATTACH_FS
1403 /*
1404  * demand attach fs
1405  * shutdown control thread
1406  */
1407 static void
1408 ShutdownController(vshutdown_thread_t * params)
1409 {
1410     /* XXX debug */
1411     struct DiskPartition64 * diskP;
1412     Device id;
1413     vshutdown_thread_t shadow;
1414
1415     ShutdownCreateSchedule(params);
1416
1417     while ((params->pass < 4) &&
1418            (params->n_threads_complete < params->n_threads)) {
1419         /* recompute schedule once per second */
1420
1421         memcpy(&shadow, params, sizeof(vshutdown_thread_t));
1422
1423         VOL_UNLOCK;
1424         /* XXX debug */
1425         Log("ShutdownController:  schedule version=%d, vol_remaining=%d, pass=%d\n",
1426             shadow.schedule_version, shadow.vol_remaining, shadow.pass);
1427         Log("ShutdownController:  n_threads_complete=%d, n_parts_done_pass=%d\n",
1428             shadow.n_threads_complete, shadow.n_parts_done_pass);
1429         for (diskP = DiskPartitionList; diskP; diskP=diskP->next) {
1430             id = diskP->index;
1431             Log("ShutdownController:  part[%d] : (len=%d, thread_target=%d, done_pass=%d, pass_head=%p)\n",
1432                 id,
1433                 diskP->vol_list.len,
1434                 shadow.part_thread_target[id],
1435                 shadow.part_done_pass[id],
1436                 shadow.part_pass_head[id]);
1437         }
1438
1439         sleep(1);
1440         VOL_LOCK;
1441
1442         ShutdownCreateSchedule(params);
1443     }
1444 }
1445
1446 /* create the shutdown thread work schedule.
1447  * this scheduler tries to implement fairness
1448  * by allocating at least 1 thread to each
1449  * partition with volumes to be shutdown,
1450  * and then it attempts to allocate remaining
1451  * threads based upon the amount of work left
1452  */
1453 static void
1454 ShutdownCreateSchedule(vshutdown_thread_t * params)
1455 {
1456     struct DiskPartition64 * diskP;
1457     int sum, thr_workload, thr_left;
1458     int part_residue[VOLMAXPARTS+1];
1459     Device id;
1460
1461     /* compute the total number of outstanding volumes */
1462     sum = 0;
1463     for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
1464         sum += diskP->vol_list.len;
1465     }
1466
1467     params->schedule_version++;
1468     params->vol_remaining = sum;
1469
1470     if (!sum)
1471         return;
1472
1473     /* compute average per-thread workload */
1474     thr_workload = sum / params->n_threads;
1475     if (sum % params->n_threads)
1476         thr_workload++;
1477
1478     thr_left = params->n_threads;
1479     memset(&part_residue, 0, sizeof(part_residue));
1480
1481     /* for fairness, give every partition with volumes remaining
1482      * at least one thread */
1483     for (diskP = DiskPartitionList; diskP && thr_left; diskP = diskP->next) {
1484         id = diskP->index;
1485         if (diskP->vol_list.len) {
1486             params->part_thread_target[id] = 1;
1487             thr_left--;
1488         } else {
1489             params->part_thread_target[id] = 0;
1490         }
1491     }
1492
1493     if (thr_left && thr_workload) {
1494         /* compute length-weighted workloads */
1495         int delta;
1496
1497         for (diskP = DiskPartitionList; diskP && thr_left; diskP = diskP->next) {
1498             id = diskP->index;
1499             delta = (diskP->vol_list.len / thr_workload) -
1500                 params->part_thread_target[id];
1501             if (delta < 0) {
1502                 continue;
1503             }
1504             if (delta < thr_left) {
1505                 params->part_thread_target[id] += delta;
1506                 thr_left -= delta;
1507             } else {
1508                 params->part_thread_target[id] += thr_left;
1509                 thr_left = 0;
1510                 break;
1511             }
1512         }
1513     }
1514
1515     if (thr_left) {
1516         /* try to assign any leftover threads to partitions that
1517          * had volume lengths closer to needing thread_target+1 */
1518         int max_residue, max_id = 0;
1519
1520         /* compute the residues */
1521         for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
1522             id = diskP->index;
1523             part_residue[id] = diskP->vol_list.len -
1524                 (params->part_thread_target[id] * thr_workload);
1525         }
1526
1527         /* now try to allocate remaining threads to partitions with the
1528          * highest residues */
1529         while (thr_left) {
1530             max_residue = 0;
1531             for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
1532                 id = diskP->index;
1533                 if (part_residue[id] > max_residue) {
1534                     max_residue = part_residue[id];
1535                     max_id = id;
1536                 }
1537             }
1538
1539             if (!max_residue) {
1540                 break;
1541             }
1542
1543             params->part_thread_target[max_id]++;
1544             thr_left--;
1545             part_residue[max_id] = 0;
1546         }
1547     }
1548
1549     if (thr_left) {
1550         /* punt and give any remaining threads equally to each partition */
1551         int alloc;
1552         if (thr_left >= params->n_parts) {
1553             alloc = thr_left / params->n_parts;
1554             for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
1555                 id = diskP->index;
1556                 params->part_thread_target[id] += alloc;
1557                 thr_left -= alloc;
1558             }
1559         }
1560
1561         /* finish off the last of the threads */
1562         for (diskP = DiskPartitionList; thr_left && diskP; diskP = diskP->next) {
1563             id = diskP->index;
1564             params->part_thread_target[id]++;
1565             thr_left--;
1566         }
1567     }
1568 }
1569
1570 /* worker thread for parallel shutdown */
1571 static void *
1572 VShutdownThread(void * args)
1573 {
1574     vshutdown_thread_t * params;
1575     int found, pass, schedule_version_save, count;
1576     struct DiskPartition64 *diskP;
1577     struct diskpartition_queue_t * dpq;
1578     Device id;
1579
1580     params = (vshutdown_thread_t *) args;
1581
1582     /* acquire the shutdown pass 0 lock */
1583     opr_mutex_enter(&params->lock);
1584
1585     /* if there's still pass 0 work to be done,
1586      * get a work entry, and do a pass 0 shutdown */
1587     if (queue_IsNotEmpty(params)) {
1588         dpq = queue_First(params, diskpartition_queue_t);
1589         queue_Remove(dpq);
1590         opr_mutex_exit(&params->lock);
1591         diskP = dpq->diskP;
1592         free(dpq);
1593         id = diskP->index;
1594
1595         count = 0;
1596         while (ShutdownVolumeWalk_r(diskP, 0, &params->part_pass_head[id]))
1597             count++;
1598         params->stats[0][diskP->index] = count;
1599         opr_mutex_enter(&params->lock);
1600     }
1601
1602     params->n_threads_complete++;
1603     if (params->n_threads_complete == params->n_threads) {
1604         /* notify control thread that all workers have completed pass 0 */
1605         opr_cv_signal(&params->master_cv);
1606     }
1607     while (params->pass == 0) {
1608         opr_cv_wait(&params->cv, &params->lock);
1609     }
1610
1611     /* switch locks */
1612     opr_mutex_exit(&params->lock);
1613     VOL_LOCK;
1614
1615     pass = params->pass;
1616     opr_Assert(pass > 0);
1617
1618     /* now escalate through the more complicated shutdowns */
1619     while (pass <= 3) {
1620         schedule_version_save = params->schedule_version;
1621         found = 0;
1622         /* find a disk partition to work on */
1623         for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
1624             id = diskP->index;
1625             if (params->part_thread_target[id] && !params->part_done_pass[id]) {
1626                 params->part_thread_target[id]--;
1627                 found = 1;
1628                 break;
1629             }
1630         }
1631
1632         if (!found) {
1633             /* hmm. for some reason the controller thread couldn't find anything for
1634              * us to do. let's see if there's anything we can do */
1635             for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
1636                 id = diskP->index;
1637                 if (diskP->vol_list.len && !params->part_done_pass[id]) {
1638                     found = 1;
1639                     break;
1640                 } else if (!params->part_done_pass[id]) {
1641                     params->part_done_pass[id] = 1;
1642                     params->n_parts_done_pass++;
1643                     if (pass == 3) {
1644                         Log("VShutdown:  done shutting down volumes on partition %s.\n",
1645                             VPartitionPath(diskP));
1646                     }
1647                 }
1648             }
1649         }
1650
1651         /* do work on this partition until either the controller
1652          * creates a new schedule, or we run out of things to do
1653          * on this partition */
1654         if (found) {
1655             count = 0;
1656             while (!params->part_done_pass[id] &&
1657                    (schedule_version_save == params->schedule_version)) {
1658                 /* ShutdownVolumeWalk_r will drop the glock internally */
1659                 if (!ShutdownVolumeWalk_r(diskP, pass, &params->part_pass_head[id])) {
1660                     if (!params->part_done_pass[id]) {
1661                         params->part_done_pass[id] = 1;
1662                         params->n_parts_done_pass++;
1663                         if (pass == 3) {
1664                             Log("VShutdown:  done shutting down volumes on partition %s.\n",
1665                                 VPartitionPath(diskP));
1666                         }
1667                     }
1668                     break;
1669                 }
1670                 count++;
1671             }
1672
1673             params->stats[pass][id] += count;
1674         } else {
1675             /* ok, everyone is done this pass, proceed */
1676
1677             /* barrier lock */
1678             params->n_threads_complete++;
1679             while (params->pass == pass) {
1680                 if (params->n_threads_complete == params->n_threads) {
1681                     /* we are the last thread to complete, so we will
1682                      * reinitialize worker pool state for the next pass */
1683                     params->n_threads_complete = 0;
1684                     params->n_parts_done_pass = 0;
1685                     params->pass++;
1686                     for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
1687                         id = diskP->index;
1688                         params->part_done_pass[id] = 0;
1689                         params->part_pass_head[id] = queue_First(&diskP->vol_list, rx_queue);
1690                     }
1691
1692                     /* compute a new thread schedule before releasing all the workers */
1693                     ShutdownCreateSchedule(params);
1694
1695                     /* wake up all the workers */
1696                     opr_cv_broadcast(&params->cv);
1697
1698                     VOL_UNLOCK;
1699                     Log("VShutdown:  pass %d completed using %d threads on %d partitions\n",
1700                         pass, params->n_threads, params->n_parts);
1701                     VOL_LOCK;
1702                 } else {
1703                     VOL_CV_WAIT(&params->cv);
1704                 }
1705             }
1706             pass = params->pass;
1707         }
1708
1709         /* for fairness */
1710         VOL_UNLOCK;
1711         pthread_yield();
1712         VOL_LOCK;
1713     }
1714
1715     VOL_UNLOCK;
1716
1717     return NULL;
1718 }
1719
1720 /* shut down all volumes on a given disk partition
1721  *
1722  * note that this function will not allow mp-fast
1723  * shutdown of a partition */
1724 int
1725 VShutdownByPartition_r(struct DiskPartition64 * dp)
1726 {
1727     int pass;
1728     int pass_stats[4];
1729     int total;
1730
1731     /* wait for other exclusive ops to finish */
1732     VVByPListWait_r(dp);
1733
1734     /* begin exclusive access */
1735     VVByPListBeginExclusive_r(dp);
1736
1737     /* pick the low-hanging fruit first,
1738      * then do the complicated ones last
1739      * (has the advantage of keeping
1740      *  in-use volumes up until the bitter end) */
1741     for (pass = 0, total=0; pass < 4; pass++) {
1742         pass_stats[pass] = ShutdownVByPForPass_r(dp, pass);
1743         total += pass_stats[pass];
1744     }
1745
1746     /* end exclusive access */
1747     VVByPListEndExclusive_r(dp);
1748
1749     Log("VShutdownByPartition:  shut down %d volumes on %s (pass[0]=%d, pass[1]=%d, pass[2]=%d, pass[3]=%d)\n",
1750         total, VPartitionPath(dp), pass_stats[0], pass_stats[1], pass_stats[2], pass_stats[3]);
1751
1752     return 0;
1753 }
1754
1755 /* internal shutdown functionality
1756  *
1757  * for multi-pass shutdown:
1758  * 0 to only "shutdown" {pre,un}attached and error state volumes
1759  * 1 to also shutdown attached volumes w/ volume header loaded
1760  * 2 to also shutdown attached volumes w/o volume header loaded
1761  * 3 to also shutdown exclusive state volumes
1762  *
1763  * caller MUST hold exclusive access on the hash chain
1764  * because we drop vol_glock_mutex internally
1765  *
1766  * this function is reentrant for passes 1--3
1767  * (e.g. multiple threads can cooperate to
1768  *  shutdown a partition mp-fast)
1769  *
1770  * pass 0 is not scaleable because the volume state data is
1771  * synchronized by vol_glock mutex, and the locking overhead
1772  * is too high to drop the lock long enough to do linked list
1773  * traversal
1774  */
1775 static int
1776 ShutdownVByPForPass_r(struct DiskPartition64 * dp, int pass)
1777 {
1778     struct rx_queue * q = queue_First(&dp->vol_list, rx_queue);
1779     int i = 0;
1780     const char *pass_strs[4] = {"{un/pre}attached vols", "vols w/ vol header loaded", "vols w/o vol header loaded", "vols with exclusive state"};
1781
1782     while (ShutdownVolumeWalk_r(dp, pass, &q)) {
1783         i++;
1784         if (0 == i%100) {
1785             Log("VShutdownByPartition:  ... shut down %d volumes on %s in pass %d (%s)\n", i, VPartitionPath(dp), pass, pass_strs[pass]);
1786         }
1787     }
1788
1789     return i;
1790 }
1791
1792 /* conditionally shutdown one volume on partition dp
1793  * returns 1 if a volume was shutdown in this pass,
1794  * 0 otherwise */
1795 static int
1796 ShutdownVolumeWalk_r(struct DiskPartition64 * dp, int pass,
1797                      struct rx_queue ** idx)
1798 {
1799     struct rx_queue *qp, *nqp;
1800     Volume * vp;
1801
1802     qp = *idx;
1803
1804     for (queue_ScanFrom(&dp->vol_list, qp, qp, nqp, rx_queue)) {
1805         vp = (Volume *) (((char *)qp) - offsetof(Volume, vol_list));
1806
1807         switch (pass) {
1808         case 0:
1809             if ((V_attachState(vp) != VOL_STATE_UNATTACHED) &&
1810                 (V_attachState(vp) != VOL_STATE_ERROR) &&
1811                 (V_attachState(vp) != VOL_STATE_DELETED) &&
1812                 (V_attachState(vp) != VOL_STATE_PREATTACHED)) {
1813                 break;
1814             }
1815         case 1:
1816             if ((V_attachState(vp) == VOL_STATE_ATTACHED) &&
1817                 (vp->header == NULL)) {
1818                 break;
1819             }
1820         case 2:
1821             if (VIsExclusiveState(V_attachState(vp))) {
1822                 break;
1823             }
1824         case 3:
1825             *idx = nqp;
1826             DeleteVolumeFromVByPList_r(vp);
1827             VShutdownVolume_r(vp);
1828             vp = NULL;
1829             return 1;
1830         }
1831     }
1832
1833     return 0;
1834 }
1835
1836 /*
1837  * shutdown a specific volume
1838  */
1839 /* caller MUST NOT hold a heavyweight ref on vp */
1840 int
1841 VShutdownVolume_r(Volume * vp)
1842 {
1843     int code;
1844
1845     VCreateReservation_r(vp);
1846
1847     if (LogLevel >= 5) {
1848         Log("VShutdownVolume_r:  vid=%" AFS_VOLID_FMT ", device=%d, state=%u\n",
1849             afs_printable_VolumeId_lu(vp->hashid), vp->partition->device,
1850             (unsigned int) V_attachState(vp));
1851     }
1852
1853     /* wait for other blocking ops to finish */
1854     VWaitExclusiveState_r(vp);
1855
1856     opr_Assert(VIsValidState(V_attachState(vp)));
1857
1858     switch(V_attachState(vp)) {
1859     case VOL_STATE_SALVAGING:
1860         /* Leave salvaging volumes alone. Any in-progress salvages will
1861          * continue working after viced shuts down. This is intentional.
1862          */
1863
1864     case VOL_STATE_PREATTACHED:
1865     case VOL_STATE_ERROR:
1866         VChangeState_r(vp, VOL_STATE_UNATTACHED);
1867     case VOL_STATE_UNATTACHED:
1868     case VOL_STATE_DELETED:
1869         break;
1870     case VOL_STATE_GOING_OFFLINE:
1871     case VOL_STATE_SHUTTING_DOWN:
1872     case VOL_STATE_ATTACHED:
1873         code = VHold_r(vp);
1874         if (!code) {
1875             if (LogLevel >= 5)
1876                 Log("VShutdown:  Attempting to take volume %" AFS_VOLID_FMT " offline.\n",
1877                     afs_printable_VolumeId_lu(vp->hashid));
1878
1879             /* take the volume offline (drops reference count) */
1880             VOffline_r(vp, "File server was shut down");
1881         }
1882         break;
1883     default:
1884         break;
1885     }
1886
1887     VCancelReservation_r(vp);
1888     vp = NULL;
1889     return 0;
1890 }
1891 #endif /* AFS_DEMAND_ATTACH_FS */
1892
1893
1894 /***************************************************/
1895 /* Header I/O routines                             */
1896 /***************************************************/
1897
1898 static const char *
1899 HeaderName(bit32 magic)
1900 {
1901     switch (magic) {
1902     case VOLUMEINFOMAGIC:
1903         return "volume info";
1904     case SMALLINDEXMAGIC:
1905         return "small index";
1906     case LARGEINDEXMAGIC:
1907         return "large index";
1908     case LINKTABLEMAGIC:
1909         return "link table";
1910     }
1911     return "unknown";
1912 }
1913
1914 /* open a descriptor for the inode (h),
1915  * read in an on-disk structure into buffer (to) of size (size),
1916  * verify versionstamp in structure has magic (magic) and
1917  * optionally verify version (version) if (version) is nonzero
1918  */
1919 static void
1920 ReadHeader(Error * ec, IHandle_t * h, char *to, int size, bit32 magic,
1921            bit32 version)
1922 {
1923     struct versionStamp *vsn;
1924     FdHandle_t *fdP;
1925     afs_sfsize_t nbytes;
1926     afs_ino_str_t stmp;
1927
1928     *ec = 0;
1929     if (h == NULL) {
1930         Log("ReadHeader: Null inode handle argument for %s header file.\n",
1931             HeaderName(magic));
1932         *ec = VSALVAGE;
1933         return;
1934     }
1935
1936     fdP = IH_OPEN(h);
1937     if (fdP == NULL) {
1938         Log("ReadHeader: Failed to open %s header file "
1939             "(volume=%" AFS_VOLID_FMT ", inode=%s); errno=%d\n", HeaderName(magic), afs_printable_VolumeId_lu(h->ih_vid),
1940             PrintInode(stmp, h->ih_ino), errno);
1941         *ec = VSALVAGE;
1942         return;
1943     }
1944
1945     vsn = (struct versionStamp *)to;
1946     nbytes = FDH_PREAD(fdP, to, size, 0);
1947     if (nbytes < 0) {
1948         Log("ReadHeader: Failed to read %s header file "
1949             "(volume=%" AFS_VOLID_FMT ", inode=%s); errno=%d\n", HeaderName(magic), afs_printable_VolumeId_lu(h->ih_vid),
1950             PrintInode(stmp, h->ih_ino), errno);
1951         *ec = VSALVAGE;
1952         FDH_REALLYCLOSE(fdP);
1953         return;
1954     }
1955     if (nbytes != size) {
1956         Log("ReadHeader: Incorrect number of bytes read from %s header file "
1957             "(volume=%" AFS_VOLID_FMT ", inode=%s); expected=%d, read=%d\n",
1958             HeaderName(magic), afs_printable_VolumeId_lu(h->ih_vid), 
1959             PrintInode(stmp, h->ih_ino), size, (int)nbytes);
1960         *ec = VSALVAGE;
1961         FDH_REALLYCLOSE(fdP);
1962         return;
1963     }
1964     if (vsn->magic != magic) {
1965         Log("ReadHeader: Incorrect magic for %s header file "
1966             "(volume=%" AFS_VOLID_FMT ", inode=%s); expected=0x%x, read=0x%x\n",
1967             HeaderName(magic), afs_printable_VolumeId_lu(h->ih_vid),
1968             PrintInode(stmp, h->ih_ino), magic, vsn->magic);
1969         *ec = VSALVAGE;
1970         FDH_REALLYCLOSE(fdP);
1971         return;
1972     }
1973
1974     FDH_CLOSE(fdP);
1975
1976     /* Check is conditional, in case caller wants to inspect version himself */
1977     if (version && vsn->version != version) {
1978         Log("ReadHeader: Incorrect version for %s header file "
1979             "(volume=%" AFS_VOLID_FMT ", inode=%s); expected=%x, read=%x\n",
1980             HeaderName(magic), afs_printable_VolumeId_lu(h->ih_vid), PrintInode(stmp, h->ih_ino),
1981             version, vsn->version);
1982         *ec = VSALVAGE;
1983     }
1984 }
1985
1986 void
1987 WriteVolumeHeader_r(Error * ec, Volume * vp)
1988 {
1989     IHandle_t *h = V_diskDataHandle(vp);
1990     FdHandle_t *fdP;
1991
1992     *ec = 0;
1993
1994     fdP = IH_OPEN(h);
1995     if (fdP == NULL) {
1996         *ec = VSALVAGE;
1997         return;
1998     }
1999     if (FDH_PWRITE(fdP, (char *)&V_disk(vp), sizeof(V_disk(vp)), 0)
2000         != sizeof(V_disk(vp))) {
2001         *ec = VSALVAGE;
2002         FDH_REALLYCLOSE(fdP);
2003         return;
2004     }
2005     FDH_CLOSE(fdP);
2006 }
2007
2008 /* VolumeHeaderToDisk
2009  * Allows for storing 64 bit inode numbers in on-disk volume header
2010  * file.
2011  */
2012 /* convert in-memory representation of a volume header to the
2013  * on-disk representation of a volume header */
2014 void
2015 VolumeHeaderToDisk(VolumeDiskHeader_t * dh, VolumeHeader_t * h)
2016 {
2017
2018     memset(dh, 0, sizeof(VolumeDiskHeader_t));
2019     dh->stamp = h->stamp;
2020     dh->id = h->id;
2021     dh->parent = h->parent;
2022
2023 #ifdef AFS_64BIT_IOPS_ENV
2024     dh->volumeInfo_lo = (afs_int32) h->volumeInfo & 0xffffffff;
2025     dh->volumeInfo_hi = (afs_int32) (h->volumeInfo >> 32) & 0xffffffff;
2026     dh->smallVnodeIndex_lo = (afs_int32) h->smallVnodeIndex & 0xffffffff;
2027     dh->smallVnodeIndex_hi =
2028         (afs_int32) (h->smallVnodeIndex >> 32) & 0xffffffff;
2029     dh->largeVnodeIndex_lo = (afs_int32) h->largeVnodeIndex & 0xffffffff;
2030     dh->largeVnodeIndex_hi =
2031         (afs_int32) (h->largeVnodeIndex >> 32) & 0xffffffff;
2032     dh->linkTable_lo = (afs_int32) h->linkTable & 0xffffffff;
2033     dh->linkTable_hi = (afs_int32) (h->linkTable >> 32) & 0xffffffff;
2034 #else
2035     dh->volumeInfo_lo = h->volumeInfo;
2036     dh->smallVnodeIndex_lo = h->smallVnodeIndex;
2037     dh->largeVnodeIndex_lo = h->largeVnodeIndex;
2038     dh->linkTable_lo = h->linkTable;
2039 #endif
2040 }
2041
2042 /* DiskToVolumeHeader
2043  * Converts an on-disk representation of a volume header to
2044  * the in-memory representation of a volume header.
2045  *
2046  * Makes the assumption that AFS has *always*
2047  * zero'd the volume header file so that high parts of inode
2048  * numbers are 0 in older (SGI EFS) volume header files.
2049  */
2050 void
2051 DiskToVolumeHeader(VolumeHeader_t * h, VolumeDiskHeader_t * dh)
2052 {
2053     memset(h, 0, sizeof(VolumeHeader_t));
2054     h->stamp = dh->stamp;
2055     h->id = dh->id;
2056     h->parent = dh->parent;
2057
2058 #ifdef AFS_64BIT_IOPS_ENV
2059     h->volumeInfo =
2060         (Inode) dh->volumeInfo_lo | ((Inode) dh->volumeInfo_hi << 32);
2061
2062     h->smallVnodeIndex =
2063         (Inode) dh->smallVnodeIndex_lo | ((Inode) dh->
2064                                           smallVnodeIndex_hi << 32);
2065
2066     h->largeVnodeIndex =
2067         (Inode) dh->largeVnodeIndex_lo | ((Inode) dh->
2068                                           largeVnodeIndex_hi << 32);
2069     h->linkTable =
2070         (Inode) dh->linkTable_lo | ((Inode) dh->linkTable_hi << 32);
2071 #else
2072     h->volumeInfo = dh->volumeInfo_lo;
2073     h->smallVnodeIndex = dh->smallVnodeIndex_lo;
2074     h->largeVnodeIndex = dh->largeVnodeIndex_lo;
2075     h->linkTable = dh->linkTable_lo;
2076 #endif
2077 }
2078
2079
2080 /***************************************************/
2081 /* Volume Attachment routines                      */
2082 /***************************************************/
2083
2084 #ifdef AFS_DEMAND_ATTACH_FS
2085 /**
2086  * pre-attach a volume given its path.
2087  *
2088  * @param[out] ec         outbound error code
2089  * @param[in]  partition  partition path string
2090  * @param[in]  name       volume id string
2091  *
2092  * @return volume object pointer
2093  *
2094  * @note A pre-attached volume will only have its partition
2095  *       and hashid fields initialized.  At first call to
2096  *       VGetVolume, the volume will be fully attached.
2097  *
2098  */
2099 Volume *
2100 VPreAttachVolumeByName(Error * ec, char *partition, char *name)
2101 {
2102     Volume * vp;
2103     VOL_LOCK;
2104     vp = VPreAttachVolumeByName_r(ec, partition, name);
2105     VOL_UNLOCK;
2106     return vp;
2107 }
2108
2109 /**
2110  * pre-attach a volume given its path.
2111  *
2112  * @param[out] ec         outbound error code
2113  * @param[in]  partition  path to vice partition
2114  * @param[in]  name       volume id string
2115  *
2116  * @return volume object pointer
2117  *
2118  * @pre VOL_LOCK held
2119  *
2120  * @internal volume package internal use only.
2121  */
2122 Volume *
2123 VPreAttachVolumeByName_r(Error * ec, char *partition, char *name)
2124 {
2125     return VPreAttachVolumeById_r(ec,
2126                                   partition,
2127                                   VolumeNumber(name));
2128 }
2129
2130 /**
2131  * pre-attach a volume given its path and numeric volume id.
2132  *
2133  * @param[out] ec          error code return
2134  * @param[in]  partition   path to vice partition
2135  * @param[in]  volumeId    numeric volume id
2136  *
2137  * @return volume object pointer
2138  *
2139  * @pre VOL_LOCK held
2140  *
2141  * @internal volume package internal use only.
2142  */
2143 Volume *
2144 VPreAttachVolumeById_r(Error * ec,
2145                        char * partition,
2146                        VolumeId volumeId)
2147 {
2148     Volume *vp;
2149     struct DiskPartition64 *partp;
2150
2151     *ec = 0;
2152
2153     opr_Assert(programType == fileServer);
2154
2155     if (!(partp = VGetPartition_r(partition, 0))) {
2156         *ec = VNOVOL;
2157         Log("VPreAttachVolumeById_r:  Error getting partition (%s)\n", partition);
2158         return NULL;
2159     }
2160
2161     /* ensure that any vp we pass to VPreAttachVolumeByVp_r
2162      * is NOT in exclusive state.
2163      */
2164  retry:
2165     vp = VLookupVolume_r(ec, volumeId, NULL);
2166
2167     if (*ec) {
2168         return NULL;
2169     }
2170
2171     if (vp && VIsExclusiveState(V_attachState(vp))) {
2172         VCreateReservation_r(vp);
2173         VWaitExclusiveState_r(vp);
2174         VCancelReservation_r(vp);
2175         vp = NULL;
2176         goto retry;    /* look up volume again */
2177     }
2178
2179     /* vp == NULL or vp not exclusive both OK */
2180
2181     return VPreAttachVolumeByVp_r(ec, partp, vp, volumeId);
2182 }
2183
2184 /**
2185  * preattach a volume.
2186  *
2187  * @param[out] ec     outbound error code
2188  * @param[in]  partp  pointer to partition object
2189  * @param[in]  vp     pointer to volume object
2190  * @param[in]  vid    volume id
2191  *
2192  * @return volume object pointer
2193  *
2194  * @pre VOL_LOCK is held.
2195  *
2196  * @pre vp (if specified) must not be in exclusive state.
2197  *
2198  * @warning Returned volume object pointer does not have to
2199  *          equal the pointer passed in as argument vp.  There
2200  *          are potential race conditions which can result in
2201  *          the pointers having different values.  It is up to
2202  *          the caller to make sure that references are handled
2203  *          properly in this case.
2204  *
2205  * @note If there is already a volume object registered with
2206  *       the same volume id, its pointer MUST be passed as
2207  *       argument vp.  Failure to do so will result in a silent
2208  *       failure to preattach.
2209  *
2210  * @internal volume package internal use only.
2211  */
2212 Volume *
2213 VPreAttachVolumeByVp_r(Error * ec,
2214                        struct DiskPartition64 * partp,
2215                        Volume * vp,
2216                        VolumeId vid)
2217 {
2218     Volume *nvp = NULL;
2219
2220     *ec = 0;
2221
2222     /* don't proceed unless it's safe */
2223     if (vp) {
2224         opr_Assert(!VIsExclusiveState(V_attachState(vp)));
2225     }
2226
2227     /* check to see if pre-attach already happened */
2228     if (vp &&
2229         (V_attachState(vp) != VOL_STATE_UNATTACHED) &&
2230         (V_attachState(vp) != VOL_STATE_DELETED) &&
2231         (V_attachState(vp) != VOL_STATE_PREATTACHED) &&
2232         !VIsErrorState(V_attachState(vp))) {
2233         /*
2234          * pre-attach is a no-op in all but the following cases:
2235          *
2236          *   - volume is unattached
2237          *   - volume is in an error state
2238          *   - volume is pre-attached
2239          */
2240         Log("VPreattachVolumeByVp_r: volume %" AFS_VOLID_FMT " not in quiescent state (state %u flags 0x%x)\n",
2241             afs_printable_VolumeId_lu(vid), V_attachState(vp),
2242             V_attachFlags(vp));
2243         goto done;
2244     } else if (vp) {
2245         /* we're re-attaching a volume; clear out some old state */
2246         memset(&vp->salvage, 0, sizeof(struct VolumeOnlineSalvage));
2247
2248         if (V_partition(vp) != partp) {
2249             /* XXX potential race */
2250             DeleteVolumeFromVByPList_r(vp);
2251         }
2252     } else {
2253         /* if we need to allocate a new Volume struct,
2254          * go ahead and drop the vol glock, otherwise
2255          * do the basic setup synchronised, as it's
2256          * probably not worth dropping the lock */
2257         VOL_UNLOCK;
2258
2259         /* allocate the volume structure */
2260         vp = nvp = calloc(1, sizeof(Volume));
2261         opr_Assert(vp != NULL);
2262         queue_Init(&vp->vnode_list);
2263         queue_Init(&vp->rx_call_list);
2264         opr_cv_init(&V_attachCV(vp));
2265     }
2266
2267     /* link the volume with its associated vice partition */
2268     vp->device = partp->device;
2269     vp->partition = partp;
2270
2271     vp->hashid = vid;
2272     vp->specialStatus = 0;
2273
2274     /* if we dropped the lock, reacquire the lock,
2275      * check for pre-attach races, and then add
2276      * the volume to the hash table */
2277     if (nvp) {
2278         VOL_LOCK;
2279         nvp = VLookupVolume_r(ec, vid, NULL);
2280         if (*ec) {
2281             free(vp);
2282             vp = NULL;
2283             goto done;
2284         } else if (nvp) { /* race detected */
2285             free(vp);
2286             vp = nvp;
2287             goto done;
2288         } else {
2289           /* hack to make up for VChangeState_r() decrementing
2290            * the old state counter */
2291           VStats.state_levels[0]++;
2292         }
2293     }
2294
2295     /* put pre-attached volume onto the hash table
2296      * and bring it up to the pre-attached state */
2297     AddVolumeToHashTable(vp, vp->hashid);
2298     AddVolumeToVByPList_r(vp);
2299     VLRU_Init_Node_r(vp);
2300     VChangeState_r(vp, VOL_STATE_PREATTACHED);
2301
2302     if (LogLevel >= 5)
2303         Log("VPreAttachVolumeByVp_r:  volume %" AFS_VOLID_FMT " pre-attached\n", afs_printable_VolumeId_lu(vp->hashid));
2304
2305   done:
2306     if (*ec)
2307         return NULL;
2308     else
2309         return vp;
2310 }
2311 #endif /* AFS_DEMAND_ATTACH_FS */
2312
2313 /* Attach an existing volume, given its pathname, and return a
2314    pointer to the volume header information.  The volume also
2315    normally goes online at this time.  An offline volume
2316    must be reattached to make it go online */
2317 Volume *
2318 VAttachVolumeByName(Error * ec, char *partition, char *name, int mode)
2319 {
2320     Volume *retVal;
2321     VOL_LOCK;
2322     retVal = VAttachVolumeByName_r(ec, partition, name, mode);
2323     VOL_UNLOCK;
2324     return retVal;
2325 }
2326
2327 Volume *
2328 VAttachVolumeByName_r(Error * ec, char *partition, char *name, int mode)
2329 {
2330     Volume *vp = NULL;
2331     struct DiskPartition64 *partp;
2332     char path[64];
2333     int isbusy = 0;
2334     VolumeId volumeId;
2335     int checkedOut;
2336 #ifdef AFS_DEMAND_ATTACH_FS
2337     VolumeStats stats_save;
2338     Volume *svp = NULL;
2339 #endif /* AFS_DEMAND_ATTACH_FS */
2340
2341     *ec = 0;
2342
2343     volumeId = VolumeNumber(name);
2344
2345     if (!(partp = VGetPartition_r(partition, 0))) {
2346         *ec = VNOVOL;
2347         Log("VAttachVolume: Error getting partition (%s)\n", partition);
2348         goto done;
2349     }
2350
2351     if (VRequiresPartLock()) {
2352         opr_Assert(VInit == 3);
2353         VLockPartition_r(partition);
2354     } else if (programType == fileServer) {
2355 #ifdef AFS_DEMAND_ATTACH_FS
2356         /* lookup the volume in the hash table */
2357         vp = VLookupVolume_r(ec, volumeId, NULL);
2358         if (*ec) {
2359             return NULL;
2360         }
2361
2362         if (vp) {
2363             /* save any counters that are supposed to
2364              * be monotonically increasing over the
2365              * lifetime of the fileserver */
2366             memcpy(&stats_save, &vp->stats, sizeof(VolumeStats));
2367         } else {
2368             memset(&stats_save, 0, sizeof(VolumeStats));
2369         }
2370
2371         /* if there's something in the hash table, and it's not
2372          * in the pre-attach state, then we may need to detach
2373          * it before proceeding */
2374         if (vp && (V_attachState(vp) != VOL_STATE_PREATTACHED)) {
2375             VCreateReservation_r(vp);
2376             VWaitExclusiveState_r(vp);
2377
2378             /* at this point state must be one of:
2379              *   - UNATTACHED
2380              *   - ATTACHED
2381              *   - SHUTTING_DOWN
2382              *   - GOING_OFFLINE
2383              *   - SALVAGING
2384              *   - ERROR
2385              *   - DELETED
2386              */
2387
2388             if (vp->specialStatus == VBUSY)
2389                 isbusy = 1;
2390
2391             /* if it's already attached, see if we can return it */
2392             if (V_attachState(vp) == VOL_STATE_ATTACHED) {
2393                 VGetVolumeByVp_r(ec, vp);
2394                 if (V_inUse(vp) == fileServer) {
2395                     VCancelReservation_r(vp);
2396                     return vp;
2397                 }
2398
2399                 /* otherwise, we need to detach, and attempt to re-attach */
2400                 VDetachVolume_r(ec, vp);
2401                 if (*ec) {
2402                     Log("VAttachVolume: Error detaching old volume instance (%s)\n", name);
2403                 }
2404             } else {
2405                 /* if it isn't fully attached, delete from the hash tables,
2406                    and let the refcounter handle the rest */
2407                 DeleteVolumeFromHashTable(vp);
2408                 DeleteVolumeFromVByPList_r(vp);
2409             }
2410
2411             VCancelReservation_r(vp);
2412             vp = NULL;
2413         }
2414
2415         /* pre-attach volume if it hasn't been done yet */
2416         if (!vp ||
2417             (V_attachState(vp) == VOL_STATE_UNATTACHED) ||
2418             (V_attachState(vp) == VOL_STATE_DELETED) ||
2419             (V_attachState(vp) == VOL_STATE_ERROR)) {
2420             svp = vp;
2421             vp = VPreAttachVolumeByVp_r(ec, partp, vp, volumeId);
2422             if (*ec) {
2423                 return NULL;
2424             }
2425         }
2426
2427         opr_Assert(vp != NULL);
2428
2429         /* handle pre-attach races
2430          *
2431          * multiple threads can race to pre-attach a volume,
2432          * but we can't let them race beyond that
2433          *
2434          * our solution is to let the first thread to bring
2435          * the volume into an exclusive state win; the other
2436          * threads just wait until it finishes bringing the
2437          * volume online, and then they do a vgetvolumebyvp
2438          */
2439         if (svp && (svp != vp)) {
2440             /* wait for other exclusive ops to finish */
2441             VCreateReservation_r(vp);
2442             VWaitExclusiveState_r(vp);
2443
2444             /* get a heavyweight ref, kill the lightweight ref, and return */
2445             VGetVolumeByVp_r(ec, vp);
2446             VCancelReservation_r(vp);
2447             return vp;
2448         }
2449
2450         /* at this point, we are chosen as the thread to do
2451          * demand attachment for this volume. all other threads
2452          * doing a getvolume on vp->hashid will block until we finish */
2453
2454         /* make sure any old header cache entries are invalidated
2455          * before proceeding */
2456         FreeVolumeHeader(vp);
2457
2458         VChangeState_r(vp, VOL_STATE_ATTACHING);
2459
2460         /* restore any saved counters */
2461         memcpy(&vp->stats, &stats_save, sizeof(VolumeStats));
2462 #else /* AFS_DEMAND_ATTACH_FS */
2463         vp = VGetVolume_r(ec, volumeId);
2464         if (vp) {
2465             if (V_inUse(vp) == fileServer)
2466                 return vp;
2467             if (vp->specialStatus == VBUSY)
2468                 isbusy = 1;
2469             VDetachVolume_r(ec, vp);
2470             if (*ec) {
2471                 Log("VAttachVolume: Error detaching volume (%s)\n", name);
2472             }
2473             vp = NULL;
2474         }
2475 #endif /* AFS_DEMAND_ATTACH_FS */
2476     }
2477
2478     *ec = 0;
2479     strcpy(path, VPartitionPath(partp));
2480
2481     VOL_UNLOCK;
2482
2483     strcat(path, OS_DIRSEP);
2484     strcat(path, name);
2485
2486     if (!vp) {
2487       vp = (Volume *) calloc(1, sizeof(Volume));
2488       opr_Assert(vp != NULL);
2489       vp->hashid = volumeId;
2490       vp->device = partp->device;
2491       vp->partition = partp;
2492       queue_Init(&vp->vnode_list);
2493       queue_Init(&vp->rx_call_list);
2494 #ifdef AFS_DEMAND_ATTACH_FS
2495       opr_cv_init(&V_attachCV(vp));
2496 #endif /* AFS_DEMAND_ATTACH_FS */
2497     }
2498
2499     /* attach2 is entered without any locks, and returns
2500      * with vol_glock_mutex held */
2501     vp = attach2(ec, volumeId, path, partp, vp, isbusy, mode, &checkedOut);
2502
2503     if (VCanUseFSSYNC() && vp) {
2504 #ifdef AFS_DEMAND_ATTACH_FS
2505         if ((mode == V_VOLUPD) || (VolumeWriteable(vp) && (mode == V_CLONE))) {
2506             /* mark volume header as in use so that volser crashes lead to a
2507              * salvage attempt */
2508             VUpdateVolume_r(ec, vp, 0);
2509         }
2510         /* for dafs, we should tell the fileserver, except for V_PEEK
2511          * where we know it is not necessary */
2512         if (mode == V_PEEK) {
2513             vp->needsPutBack = 0;
2514         } else {
2515             vp->needsPutBack = VOL_PUTBACK;
2516         }
2517 #else /* !AFS_DEMAND_ATTACH_FS */
2518         /* duplicate computation in fssync.c about whether the server
2519          * takes the volume offline or not.  If the volume isn't
2520          * offline, we must not return it when we detach the volume,
2521          * or the server will abort */
2522         if (mode == V_READONLY || mode == V_PEEK
2523             || (!VolumeWriteable(vp) && (mode == V_CLONE || mode == V_DUMP)))
2524             vp->needsPutBack = 0;
2525         else
2526             vp->needsPutBack = VOL_PUTBACK;
2527 #endif /* !AFS_DEMAND_ATTACH_FS */
2528     }
2529 #ifdef FSSYNC_BUILD_CLIENT
2530     /* Only give back the vol to the fileserver if we checked it out; attach2
2531      * will set checkedOut only if we successfully checked it out from the
2532      * fileserver. */
2533     if (VCanUseFSSYNC() && vp == NULL && checkedOut) {
2534
2535 #ifdef AFS_DEMAND_ATTACH_FS
2536         /* If we couldn't attach but we scheduled a salvage, we already
2537          * notified the fileserver; don't online it now */
2538         if (*ec != VSALVAGING)
2539 #endif /* AFS_DEMAND_ATTACH_FS */
2540         FSYNC_VolOp(volumeId, partition, FSYNC_VOL_ON, 0, NULL);
2541     } else
2542 #endif
2543     if (programType == fileServer && vp) {
2544 #ifdef AFS_DEMAND_ATTACH_FS
2545         /*
2546          * we can get here in cases where we don't "own"
2547          * the volume (e.g. volume owned by a utility).
2548          * short circuit around potential disk header races.
2549          */
2550         if (V_attachState(vp) != VOL_STATE_ATTACHED) {
2551             goto done;
2552         }
2553 #endif
2554         VUpdateVolume_r(ec, vp, 0);
2555         if (*ec) {
2556             Log("VAttachVolume: Error updating volume\n");
2557             if (vp)
2558                 VPutVolume_r(vp);
2559             goto done;
2560         }
2561         if (VolumeWriteable(vp) && V_dontSalvage(vp) == 0) {
2562 #ifndef AFS_DEMAND_ATTACH_FS
2563             /* This is a hack: by temporarily setting the incore
2564              * dontSalvage flag ON, the volume will be put back on the
2565              * Update list (with dontSalvage OFF again).  It will then
2566              * come back in N minutes with DONT_SALVAGE eventually
2567              * set.  This is the way that volumes that have never had
2568              * it set get it set; or that volumes that have been
2569              * offline without DONT SALVAGE having been set also
2570              * eventually get it set */
2571             V_dontSalvage(vp) = DONT_SALVAGE;
2572 #endif /* !AFS_DEMAND_ATTACH_FS */
2573             VAddToVolumeUpdateList_r(ec, vp);
2574             if (*ec) {
2575                 Log("VAttachVolume: Error adding volume to update list\n");
2576                 if (vp)
2577                     VPutVolume_r(vp);
2578                 goto done;
2579             }
2580         }
2581         if (LogLevel)
2582           Log("VOnline:  volume %" AFS_VOLID_FMT " (%s) attached and online\n", afs_printable_VolumeId_lu(V_id(vp)),
2583                 V_name(vp));
2584     }
2585
2586   done:
2587     if (VRequiresPartLock()) {
2588         VUnlockPartition_r(partition);
2589     }
2590     if (*ec) {
2591 #ifdef AFS_DEMAND_ATTACH_FS
2592         /* attach failed; make sure we're in error state */
2593         if (vp && !VIsErrorState(V_attachState(vp))) {
2594             VChangeState_r(vp, VOL_STATE_ERROR);
2595         }
2596 #endif /* AFS_DEMAND_ATTACH_FS */
2597         return NULL;
2598     } else {
2599         return vp;
2600     }
2601 }
2602
2603 #ifdef AFS_DEMAND_ATTACH_FS
2604 /* VAttachVolumeByVp_r
2605  *
2606  * finish attaching a volume that is
2607  * in a less than fully attached state
2608  */
2609 /* caller MUST hold a ref count on vp */
2610 static Volume *
2611 VAttachVolumeByVp_r(Error * ec, Volume * vp, int mode)
2612 {
2613     char name[VMAXPATHLEN];
2614     int reserve = 0;
2615     struct DiskPartition64 *partp;
2616     char path[64];
2617     int isbusy = 0;
2618     VolumeId volumeId;
2619     Volume * nvp = NULL;
2620     VolumeStats stats_save;
2621     int checkedOut;
2622     *ec = 0;
2623
2624     /* volume utility should never call AttachByVp */
2625     opr_Assert(programType == fileServer);
2626
2627     volumeId = vp->hashid;
2628     partp = vp->partition;
2629     VolumeExternalName_r(volumeId, name, sizeof(name));
2630
2631
2632     /* if another thread is performing a blocking op, wait */
2633     VWaitExclusiveState_r(vp);
2634
2635     memcpy(&stats_save, &vp->stats, sizeof(VolumeStats));
2636
2637     /* if it's already attached, see if we can return it */
2638     if (V_attachState(vp) == VOL_STATE_ATTACHED) {
2639         VGetVolumeByVp_r(ec, vp);
2640         if (V_inUse(vp) == fileServer) {
2641             return vp;
2642         } else {
2643             if (vp->specialStatus == VBUSY)
2644                 isbusy = 1;
2645             VDetachVolume_r(ec, vp);
2646             if (*ec) {
2647                 Log("VAttachVolume: Error detaching volume (%s)\n", name);
2648             }
2649             vp = NULL;
2650         }
2651     }
2652
2653     /* pre-attach volume if it hasn't been done yet */
2654     if (!vp ||
2655         (V_attachState(vp) == VOL_STATE_UNATTACHED) ||
2656         (V_attachState(vp) == VOL_STATE_DELETED) ||
2657         (V_attachState(vp) == VOL_STATE_ERROR)) {
2658         nvp = VPreAttachVolumeByVp_r(ec, partp, vp, volumeId);
2659         if (*ec) {
2660             return NULL;
2661         }
2662         if (nvp != vp) {
2663             reserve = 1;
2664             VCreateReservation_r(nvp);
2665             vp = nvp;
2666         }
2667     }
2668
2669     opr_Assert(vp != NULL);
2670     VChangeState_r(vp, VOL_STATE_ATTACHING);
2671
2672     /* restore monotonically increasing stats */
2673     memcpy(&vp->stats, &stats_save, sizeof(VolumeStats));
2674
2675     *ec = 0;
2676
2677     /* compute path to disk header */
2678     strcpy(path, VPartitionPath(partp));
2679
2680     VOL_UNLOCK;
2681
2682     strcat(path, OS_DIRSEP);
2683     strcat(path, name);
2684
2685     /* do volume attach
2686      *
2687      * NOTE: attach2 is entered without any locks, and returns
2688      * with vol_glock_mutex held */
2689     vp = attach2(ec, volumeId, path, partp, vp, isbusy, mode, &checkedOut);
2690
2691     /*
2692      * the event that an error was encountered, or
2693      * the volume was not brought to an attached state
2694      * for any reason, skip to the end.  We cannot
2695      * safely call VUpdateVolume unless we "own" it.
2696      */
2697     if (*ec ||
2698         (vp == NULL) ||
2699         (V_attachState(vp) != VOL_STATE_ATTACHED)) {
2700         goto done;
2701     }
2702
2703     VUpdateVolume_r(ec, vp, 0);
2704     if (*ec) {
2705         Log("VAttachVolume: Error updating volume %" AFS_VOLID_FMT "\n",
2706             afs_printable_VolumeId_lu(vp->hashid));
2707         VPutVolume_r(vp);
2708         goto done;
2709     }
2710     if (VolumeWriteable(vp) && V_dontSalvage(vp) == 0) {
2711 #ifndef AFS_DEMAND_ATTACH_FS
2712         /* This is a hack: by temporarily setting the incore
2713          * dontSalvage flag ON, the volume will be put back on the
2714          * Update list (with dontSalvage OFF again).  It will then
2715          * come back in N minutes with DONT_SALVAGE eventually
2716          * set.  This is the way that volumes that have never had
2717          * it set get it set; or that volumes that have been
2718          * offline without DONT SALVAGE having been set also
2719          * eventually get it set */
2720         V_dontSalvage(vp) = DONT_SALVAGE;
2721 #endif /* !AFS_DEMAND_ATTACH_FS */
2722         VAddToVolumeUpdateList_r(ec, vp);
2723         if (*ec) {
2724             Log("VAttachVolume: Error adding volume %" AFS_VOLID_FMT " to update list\n",
2725                 afs_printable_VolumeId_lu(vp->hashid));
2726             if (vp)
2727                 VPutVolume_r(vp);
2728             goto done;
2729         }
2730     }
2731     if (LogLevel)
2732         Log("VOnline:  volume %" AFS_VOLID_FMT " (%s) attached and online\n",
2733             afs_printable_VolumeId_lu(V_id(vp)), V_name(vp));
2734   done:
2735     if (reserve) {
2736         VCancelReservation_r(nvp);
2737         reserve = 0;
2738     }
2739     if (*ec && (*ec != VOFFLINE) && (*ec != VSALVAGE)) {
2740         if (vp && !VIsErrorState(V_attachState(vp))) {
2741             VChangeState_r(vp, VOL_STATE_ERROR);
2742         }
2743         return NULL;
2744     } else {
2745         return vp;
2746     }
2747 }
2748
2749 /**
2750  * lock a volume on disk (non-blocking).
2751  *
2752  * @param[in] vp  The volume to lock
2753  * @param[in] locktype READ_LOCK or WRITE_LOCK
2754  *
2755  * @return operation status
2756  *  @retval 0 success, lock was obtained
2757  *  @retval EBUSY a conflicting lock was held by another process
2758  *  @retval EIO   error acquiring lock
2759  *
2760  * @pre If we're in the fileserver, vp is in an exclusive state
2761  *
2762  * @pre vp is not already locked
2763  */
2764 static int
2765 VLockVolumeNB(Volume *vp, int locktype)
2766 {
2767     int code;
2768
2769     opr_Assert(programType != fileServer
2770                || VIsExclusiveState(V_attachState(vp)));
2771     opr_Assert(!(V_attachFlags(vp) & VOL_LOCKED));
2772
2773     code = VLockVolumeByIdNB(vp->hashid, vp->partition, locktype);
2774     if (code == 0) {
2775         V_attachFlags(vp) |= VOL_LOCKED;
2776     }
2777
2778     return code;
2779 }
2780
2781 /**
2782  * unlock a volume on disk that was locked with VLockVolumeNB.
2783  *
2784  * @param[in] vp  volume to unlock
2785  *
2786  * @pre If we're in the fileserver, vp is in an exclusive state
2787  *
2788  * @pre vp has already been locked
2789  */
2790 static void
2791 VUnlockVolume(Volume *vp)
2792 {
2793     opr_Assert(programType != fileServer
2794                || VIsExclusiveState(V_attachState(vp)));
2795     opr_Assert((V_attachFlags(vp) & VOL_LOCKED));
2796
2797     VUnlockVolumeById(vp->hashid, vp->partition);
2798
2799     V_attachFlags(vp) &= ~VOL_LOCKED;
2800 }
2801 #endif /* AFS_DEMAND_ATTACH_FS */
2802
2803 /**
2804  * read in a vol header, possibly lock the vol header, and possibly check out
2805  * the vol header from the fileserver, as part of volume attachment.
2806  *
2807  * @param[out] ec     error code
2808  * @param[in] vp      volume pointer object
2809  * @param[in] partp   disk partition object of the attaching partition
2810  * @param[in] mode    attachment mode such as V_VOLUPD, V_DUMP, etc (see
2811  *                    volume.h)
2812  * @param[in] peek    1 to just try to read in the volume header and make sure
2813  *                    we don't try to lock the vol, or check it out from
2814  *                    FSSYNC or anything like that; 0 otherwise, for 'normal'
2815  *                    operation
2816  * @param[out] acheckedOut   If we successfully checked-out the volume from
2817  *                           the fileserver (if we needed to), this is set
2818  *                           to 1, otherwise it is untouched.
2819  *
2820  * @note As part of DAFS volume attachment, the volume header may be either
2821  *       read- or write-locked to ensure mutual exclusion of certain volume
2822  *       operations. In some cases in order to determine whether we need to
2823  *       read- or write-lock the header, we need to read in the header to see
2824  *       if the volume is RW or not. So, if we read in the header under a
2825  *       read-lock and determine that we actually need a write-lock on the
2826  *       volume header, this function will drop the read lock, acquire a write
2827  *       lock, and read the header in again.
2828  */
2829 static void
2830 attach_volume_header(Error *ec, Volume *vp, struct DiskPartition64 *partp,
2831                      int mode, int peek, int *acheckedOut)
2832 {
2833     struct VolumeDiskHeader diskHeader;
2834     struct VolumeHeader header;
2835     int code;
2836     int first_try = 1;
2837     int lock_tries = 0, checkout_tries = 0;
2838     int retry;
2839     VolumeId volid = vp->hashid;
2840 #ifdef FSSYNC_BUILD_CLIENT
2841     int checkout, done_checkout = 0;
2842 #endif /* FSSYNC_BUILD_CLIENT */
2843 #ifdef AFS_DEMAND_ATTACH_FS
2844     int locktype = 0, use_locktype = -1;
2845 #endif /* AFS_DEMAND_ATTACH_FS */
2846
2847  retry:
2848     retry = 0;
2849     *ec = 0;
2850
2851     if (lock_tries > VOL_MAX_CHECKOUT_RETRIES) {
2852         Log("VAttachVolume: retried too many times trying to lock header for "
2853             "vol %lu part %s; giving up\n", afs_printable_uint32_lu(volid),
2854             VPartitionPath(partp));
2855         *ec = VNOVOL;
2856         goto done;
2857     }
2858     if (checkout_tries > VOL_MAX_CHECKOUT_RETRIES) {
2859         Log("VAttachVolume: retried too many times trying to checkout "
2860             "vol %lu part %s; giving up\n", afs_printable_uint32_lu(volid),
2861             VPartitionPath(partp));
2862         *ec = VNOVOL;
2863         goto done;
2864     }
2865
2866     if (VReadVolumeDiskHeader(volid, partp, NULL)) {
2867         /* short-circuit the 'volume does not exist' case */
2868         *ec = VNOVOL;
2869         goto done;
2870     }
2871
2872 #ifdef FSSYNC_BUILD_CLIENT
2873     checkout = !done_checkout;
2874     done_checkout = 1;
2875     if (!peek && checkout && VMustCheckoutVolume(mode)) {
2876         SYNC_response res;
2877         memset(&res, 0, sizeof(res));
2878
2879         if (FSYNC_VolOp(volid, partp->name, FSYNC_VOL_NEEDVOLUME, mode, &res)
2880             != SYNC_OK) {
2881
2882             if (res.hdr.reason == FSYNC_SALVAGE) {
2883                 Log("VAttachVolume: file server says volume %lu is salvaging\n",
2884                      afs_printable_uint32_lu(volid));
2885                 *ec = VSALVAGING;
2886             } else {
2887                 Log("VAttachVolume: attach of volume %lu apparently denied by file server\n",
2888                      afs_printable_uint32_lu(volid));
2889                 *ec = VNOVOL;   /* XXXX */
2890             }
2891             goto done;
2892         }
2893         *acheckedOut = 1;
2894     }
2895 #endif
2896
2897 #ifdef AFS_DEMAND_ATTACH_FS
2898     if (use_locktype < 0) {
2899         /* don't know whether vol is RO or RW; assume it's RO and we can retry
2900          * if it turns out to be RW */
2901         locktype = VVolLockType(mode, 0);
2902
2903     } else {
2904         /* a previous try says we should use use_locktype to lock the volume,
2905          * so use that */
2906         locktype = use_locktype;
2907     }
2908
2909     if (!peek && locktype) {
2910         code = VLockVolumeNB(vp, locktype);
2911         if (code) {
2912             if (code == EBUSY) {
2913                 Log("VAttachVolume: another program has vol %lu locked\n",
2914                     afs_printable_uint32_lu(volid));
2915             } else {
2916                 Log("VAttachVolume: error %d trying to lock vol %lu\n",
2917                     code, afs_printable_uint32_lu(volid));
2918             }
2919
2920             *ec = VNOVOL;
2921             goto done;
2922         }
2923     }
2924 #endif /* AFS_DEMAND_ATTACH_FS */
2925
2926     code = VReadVolumeDiskHeader(volid, partp, &diskHeader);
2927     if (code) {
2928         if (code == EIO) {
2929             *ec = VSALVAGE;
2930         } else {
2931             *ec = VNOVOL;
2932         }
2933         goto done;
2934     }
2935
2936     DiskToVolumeHeader(&header, &diskHeader);
2937
2938     IH_INIT(vp->vnodeIndex[vLarge].handle, partp->device, header.parent,
2939             header.largeVnodeIndex);
2940     IH_INIT(vp->vnodeIndex[vSmall].handle, partp->device, header.parent,
2941             header.smallVnodeIndex);
2942     IH_INIT(vp->diskDataHandle, partp->device, header.parent,
2943             header.volumeInfo);
2944     IH_INIT(vp->linkHandle, partp->device, header.parent, header.linkTable);
2945
2946     if (first_try) {
2947         /* only need to do this once */
2948         VOL_LOCK;
2949         GetVolumeHeader(vp);
2950         VOL_UNLOCK;
2951     }
2952
2953 #if defined(AFS_DEMAND_ATTACH_FS) && defined(FSSYNC_BUILD_CLIENT)
2954     /* demand attach changes the V_PEEK mechanism
2955      *
2956      * we can now suck the current disk data structure over
2957      * the fssync interface without going to disk
2958      *
2959      * (technically, we don't need to restrict this feature
2960      *  to demand attach fileservers.  However, I'm trying
2961      *  to limit the number of common code changes)
2962      */
2963     if (VCanUseFSSYNC() && (mode == V_PEEK || peek)) {
2964         SYNC_response res;
2965         res.payload.len = sizeof(VolumeDiskData);
2966         res.payload.buf = &(V_disk(vp));
2967
2968         if (FSYNC_VolOp(vp->hashid,
2969                         partp->name,
2970                         FSYNC_VOL_QUERY_HDR,
2971                         FSYNC_WHATEVER,
2972                         &res) == SYNC_OK) {
2973             goto disk_header_loaded;
2974         }
2975     }
2976 #endif /* AFS_DEMAND_ATTACH_FS && FSSYNC_BUILD_CLIENT */
2977     (void)ReadHeader(ec, V_diskDataHandle(vp), (char *)&V_disk(vp),
2978                      sizeof(V_disk(vp)), VOLUMEINFOMAGIC, VOLUMEINFOVERSION);
2979
2980 #ifdef AFS_DEMAND_ATTACH_FS
2981     /* update stats */
2982     VOL_LOCK;
2983     IncUInt64(&VStats.hdr_loads);
2984     IncUInt64(&vp->stats.hdr_loads);
2985     VOL_UNLOCK;
2986 #endif /* AFS_DEMAND_ATTACH_FS */
2987
2988     if (*ec) {
2989         Log("VAttachVolume: Error reading diskDataHandle header for vol %lu; "
2990             "error=%u\n", afs_printable_uint32_lu(volid), *ec);
2991         goto done;
2992     }
2993
2994 #ifdef AFS_DEMAND_ATTACH_FS
2995 # ifdef FSSYNC_BUILD_CLIENT
2996  disk_header_loaded:
2997 # endif /* FSSYNC_BUILD_CLIENT */
2998
2999     /* if the lock type we actually used to lock the volume is different than
3000      * the lock type we should have used, retry with the lock type we should
3001      * use */
3002     use_locktype = VVolLockType(mode, VolumeWriteable(vp));
3003     if (locktype != use_locktype) {
3004         retry = 1;
3005         lock_tries++;
3006     }
3007 #endif /* AFS_DEMAND_ATTACH_FS */
3008
3009     *ec = 0;
3010
3011  done:
3012 #if defined(AFS_DEMAND_ATTACH_FS) && defined(FSSYNC_BUILD_CLIENT)
3013     if (!peek && *ec == 0 && retry == 0 && VMustCheckoutVolume(mode)) {
3014
3015         code = FSYNC_VerifyCheckout(volid, partp->name, FSYNC_VOL_NEEDVOLUME, mode);
3016
3017         if (code == SYNC_DENIED) {
3018             /* must retry checkout; fileserver no longer thinks we have
3019              * the volume */
3020             retry = 1;
3021             checkout_tries++;
3022             done_checkout = 0;
3023
3024         } else if (code != SYNC_OK) {
3025             *ec = VNOVOL;
3026         }
3027     }
3028 #endif /* AFS_DEMAND_ATTACH_FS && FSSYNC_BUILD_CLIENT */
3029
3030     if (*ec || retry) {
3031         /* either we are going to be called again for a second pass, or we
3032          * encountered an error; clean up in either case */
3033
3034 #ifdef AFS_DEMAND_ATTACH_FS
3035         if ((V_attachFlags(vp) & VOL_LOCKED)) {
3036             VUnlockVolume(vp);
3037         }
3038 #endif /* AFS_DEMAND_ATTACH_FS */
3039         if (vp->linkHandle) {
3040             IH_RELEASE(vp->vnodeIndex[vLarge].handle);
3041             IH_RELEASE(vp->vnodeIndex[vSmall].handle);
3042             IH_RELEASE(vp->diskDataHandle);
3043             IH_RELEASE(vp->linkHandle);
3044         }
3045     }
3046
3047     if (*ec) {
3048         VOL_LOCK;
3049         FreeVolumeHeader(vp);
3050         VOL_UNLOCK;
3051         return;
3052     }
3053     if (retry) {
3054         first_try = 0;
3055         goto retry;
3056     }
3057 }
3058
3059 #ifdef AFS_DEMAND_ATTACH_FS
3060 static void
3061 attach_check_vop(Error *ec, VolumeId volid, struct DiskPartition64 *partp,
3062                  Volume *vp, int *acheckedOut)
3063 {
3064     *ec = 0;
3065
3066     if (vp->pending_vol_op) {
3067
3068         VOL_LOCK;
3069
3070         if (vp->pending_vol_op->vol_op_state == FSSYNC_VolOpRunningUnknown) {
3071             int code;
3072             code = VVolOpLeaveOnlineNoHeader_r(vp, vp->pending_vol_op);
3073             if (code == 1) {
3074                 vp->pending_vol_op->vol_op_state = FSSYNC_VolOpRunningOnline;
3075             } else if (code == 0) {
3076                 vp->pending_vol_op->vol_op_state = FSSYNC_VolOpRunningOffline;
3077
3078             } else {
3079                 /* we need the vol header to determine if the volume can be
3080                  * left online for the vop, so... get the header */
3081
3082                 VOL_UNLOCK;
3083
3084                 /* attach header with peek=1 to avoid checking out the volume
3085                  * or locking it; we just want the header info, we're not
3086                  * messing with the volume itself at all */
3087                 attach_volume_header(ec, vp, partp, V_PEEK, 1, acheckedOut);
3088                 if (*ec) {
3089                     return;
3090                 }
3091
3092                 VOL_LOCK;
3093
3094                 if (VVolOpLeaveOnline_r(vp, vp->pending_vol_op)) {
3095                     vp->pending_vol_op->vol_op_state = FSSYNC_VolOpRunningOnline;
3096                 } else {
3097                     vp->pending_vol_op->vol_op_state = FSSYNC_VolOpRunningOffline;
3098                 }
3099
3100                 /* make sure we grab a new vol header and re-open stuff on
3101                  * actual attachment; we can't keep the data we grabbed, since
3102                  * it was not done under a lock and thus not safe */
3103                 FreeVolumeHeader(vp);
3104                 VReleaseVolumeHandles_r(vp);
3105             }
3106         }
3107         /* see if the pending volume op requires exclusive access */
3108         switch (vp->pending_vol_op->vol_op_state) {
3109         case FSSYNC_VolOpPending:
3110             /* this should never happen */
3111             opr_Assert(vp->pending_vol_op->vol_op_state
3112                             != FSSYNC_VolOpPending);
3113             break;
3114
3115         case FSSYNC_VolOpRunningUnknown:
3116             /* this should never happen; we resolved 'unknown' above */
3117             opr_Assert(vp->pending_vol_op->vol_op_state
3118                             != FSSYNC_VolOpRunningUnknown);
3119             break;
3120
3121         case FSSYNC_VolOpRunningOffline:
3122             /* mark the volume down */
3123             *ec = VOFFLINE;
3124             VChangeState_r(vp, VOL_STATE_UNATTACHED);
3125
3126             /* do not set V_offlineMessage here; we don't have ownership of
3127              * the volume (and probably do not have the header loaded), so we
3128              * can't alter the disk header */
3129
3130             /* check to see if we should set the specialStatus flag */
3131             if (VVolOpSetVBusy_r(vp, vp->pending_vol_op)) {
3132                 /* don't overwrite specialStatus if it was already set to
3133                  * something else (e.g. VMOVED) */
3134                 if (!vp->specialStatus) {
3135                     vp->specialStatus = VBUSY;
3136                 }
3137             }
3138             break;
3139
3140         default:
3141             break;
3142         }
3143
3144         VOL_UNLOCK;
3145     }
3146 }
3147 #endif /* AFS_DEMAND_ATTACH_FS */
3148
3149 /**
3150  * volume attachment helper function.
3151  *
3152  * @param[out] ec      error code
3153  * @param[in] volumeId volume ID of the attaching volume
3154  * @param[in] path     full path to the volume header .vol file
3155  * @param[in] partp    disk partition object for the attaching partition
3156  * @param[in] vp       volume object; vp->hashid, vp->device, vp->partition,
3157  *                     vp->vnode_list, vp->rx_call_list, and V_attachCV (for
3158  *                     DAFS) should already be initialized
3159  * @param[in] isbusy   1 if vp->specialStatus should be set to VBUSY; that is,
3160  *                     if there is a volume operation running for this volume
3161  *                     that should set the volume to VBUSY during its run. 0
3162  *                     otherwise. (see VVolOpSetVBusy_r)
3163  * @param[in] mode     attachment mode such as V_VOLUPD, V_DUMP, etc (see
3164  *                     volume.h)
3165  * @param[out] acheckedOut   If we successfully checked-out the volume from
3166  *                           the fileserver (if we needed to), this is set
3167  *                           to 1, otherwise it is 0.
3168  *
3169  * @return pointer to the semi-attached volume pointer
3170  *  @retval NULL an error occurred (check value of *ec)
3171  *  @retval vp volume successfully attaching
3172  *
3173  * @pre no locks held
3174  *
3175  * @post VOL_LOCK held
3176  */
3177 static Volume *
3178 attach2(Error * ec, VolumeId volumeId, char *path, struct DiskPartition64 *partp,
3179         Volume * vp, int isbusy, int mode, int *acheckedOut)
3180 {
3181     /* have we read in the header successfully? */
3182     int read_header = 0;
3183
3184 #ifdef AFS_DEMAND_ATTACH_FS
3185     /* should we FreeVolume(vp) instead of VCheckFree(vp) in the error
3186      * cleanup? */
3187     int forcefree = 0;
3188
3189     /* in the case of an error, to what state should the volume be
3190      * transitioned? */
3191     VolState error_state = VOL_STATE_ERROR;
3192 #endif /* AFS_DEMAND_ATTACH_FS */
3193
3194     *ec = 0;
3195
3196     vp->vnodeIndex[vLarge].handle = NULL;
3197     vp->vnodeIndex[vSmall].handle = NULL;
3198     vp->diskDataHandle = NULL;
3199     vp->linkHandle = NULL;
3200
3201     *acheckedOut = 0;
3202
3203 #ifdef AFS_DEMAND_ATTACH_FS
3204     attach_check_vop(ec, volumeId, partp, vp, acheckedOut);
3205     if (!*ec) {
3206         attach_volume_header(ec, vp, partp, mode, 0, acheckedOut);
3207     }
3208 #else
3209     attach_volume_header(ec, vp, partp, mode, 0, acheckedOut);
3210 #endif /* !AFS_DEMAND_ATTACH_FS */
3211
3212     if (*ec == VNOVOL) {
3213         /* if the volume doesn't exist, skip straight to 'error' so we don't
3214          * request a salvage */
3215         goto unlocked_error;
3216     }
3217
3218     if (!*ec) {
3219         read_header = 1;
3220
3221         /* ensure that we don't override specialStatus if it was set to
3222          * something else (e.g. VMOVED) */
3223         if (isbusy && !vp->specialStatus) {
3224             vp->specialStatus = VBUSY;
3225         }
3226         vp->shuttingDown = 0;
3227         vp->goingOffline = 0;
3228         vp->nUsers = 1;
3229 #ifdef AFS_DEMAND_ATTACH_FS
3230         vp->stats.last_attach = FT_ApproxTime();
3231         vp->stats.attaches++;
3232 #endif
3233
3234         VOL_LOCK;
3235         IncUInt64(&VStats.attaches);
3236         vp->cacheCheck = ++VolumeCacheCheck;
3237         /* just in case this ever rolls over */
3238         if (!vp->cacheCheck)
3239             vp->cacheCheck = ++VolumeCacheCheck;
3240         VOL_UNLOCK;
3241
3242 #ifdef AFS_DEMAND_ATTACH_FS
3243         V_attachFlags(vp) |= VOL_HDR_LOADED;
3244         vp->stats.last_hdr_load = vp->stats.last_attach;
3245 #endif /* AFS_DEMAND_ATTACH_FS */
3246     }
3247
3248     if (!*ec) {
3249         struct IndexFileHeader iHead;
3250
3251         /*
3252          * We just read in the diskstuff part of the header.  If the detailed
3253          * volume stats area has not yet been initialized, we should bzero the
3254          * area and mark it as initialized.
3255          */
3256         if (!(V_stat_initialized(vp))) {
3257             memset((V_stat_area(vp)), 0, VOL_STATS_BYTES);
3258             V_stat_initialized(vp) = 1;
3259         }
3260
3261         (void)ReadHeader(ec, vp->vnodeIndex[vSmall].handle,
3262                          (char *)&iHead, sizeof(iHead),
3263                          SMALLINDEXMAGIC, SMALLINDEXVERSION);
3264
3265         if (*ec) {
3266             Log("VAttachVolume: Error reading smallVnode vol header %s; error=%u\n", path, *ec);
3267         }
3268     }
3269
3270     if (!*ec) {
3271         struct IndexFileHeader iHead;
3272
3273         (void)ReadHeader(ec, vp->vnodeIndex[vLarge].handle,
3274                          (char *)&iHead, sizeof(iHead),
3275                          LARGEINDEXMAGIC, LARGEINDEXVERSION);
3276
3277         if (*ec) {
3278             Log("VAttachVolume: Error reading largeVnode vol header %s; error=%u\n", path, *ec);
3279         }
3280     }
3281
3282 #ifdef AFS_NAMEI_ENV
3283     if (!*ec) {
3284         struct versionStamp stamp;
3285
3286         (void)ReadHeader(ec, V_linkHandle(vp), (char *)&stamp,
3287                          sizeof(stamp), LINKTABLEMAGIC, LINKTABLEVERSION);
3288
3289         if (*ec) {
3290             Log("VAttachVolume: Error reading namei vol header %s; error=%u\n", path, *ec);
3291         }
3292     }
3293 #endif /* AFS_NAMEI_ENV */
3294
3295 #if defined(AFS_DEMAND_ATTACH_FS)
3296     if (*ec && ((*ec != VOFFLINE) || (V_attachState(vp) != VOL_STATE_UNATTACHED))) {
3297         VOL_LOCK;
3298         if (!VCanScheduleSalvage()) {
3299             Log("VAttachVolume: Error attaching volume %s; volume needs salvage; error=%u\n", path, *ec);
3300         }
3301         VRequestSalvage_r(ec, vp, SALVSYNC_ERROR, VOL_SALVAGE_NO_OFFLINE);
3302         vp->nUsers = 0;
3303
3304         goto locked_error;
3305     } else if (*ec) {
3306         /* volume operation in progress */
3307         VOL_LOCK;
3308         /* we have already transitioned the vp away from ATTACHING state, so we
3309          * can go right to the end of attach2, and we do not need to transition
3310          * to ERROR. */
3311         goto error_notbroken;
3312     }
3313 #else /* AFS_DEMAND_ATTACH_FS */
3314     if (*ec) {
3315         Log("VAttachVolume: Error attaching volume %s; volume needs salvage; error=%u\n", path, *ec);
3316         goto unlocked_error;
3317     }
3318 #endif /* AFS_DEMAND_ATTACH_FS */
3319
3320     if (V_needsSalvaged(vp)) {
3321         if (vp->specialStatus)
3322             vp->specialStatus = 0;
3323         VOL_LOCK;
3324 #if defined(AFS_DEMAND_ATTACH_FS)
3325         if (!VCanScheduleSalvage()) {
3326             Log("VAttachVolume: volume salvage flag is ON for %s; volume needs salvage\n", path);
3327         }
3328         VRequestSalvage_r(ec, vp, SALVSYNC_NEEDED, VOL_SALVAGE_NO_OFFLINE);
3329         vp->nUsers = 0;
3330
3331 #else /* AFS_DEMAND_ATTACH_FS */
3332         *ec = VSALVAGE;
3333 #endif /* AFS_DEMAND_ATTACH_FS */
3334
3335         goto locked_error;
3336     }
3337
3338     VOL_LOCK;
3339     vp->nextVnodeUnique = V_uniquifier(vp);
3340
3341     if (VShouldCheckInUse(mode) && V_inUse(vp) && VolumeWriteable(vp)) {
3342         if (!V_needsSalvaged(vp)) {
3343             V_needsSalvaged(vp) = 1;
3344             VUpdateVolume_r(ec, vp, 0);
3345         }
3346 #if defined(AFS_DEMAND_ATTACH_FS)
3347         if (!VCanScheduleSalvage()) {
3348             Log("VAttachVolume: volume %s needs to be salvaged; not attached.\n", path);
3349         }
3350         VRequestSalvage_r(ec, vp, SALVSYNC_NEEDED, VOL_SALVAGE_NO_OFFLINE);
3351         vp->nUsers = 0;
3352
3353 #else /* AFS_DEMAND_ATTACH_FS */
3354         Log("VAttachVolume: volume %s needs to be salvaged; not attached.\n", path);
3355         *ec = VSALVAGE;
3356 #endif /* AFS_DEMAND_ATTACH_FS */
3357
3358         goto locked_error;
3359     }
3360
3361     if (programType == fileServer && V_destroyMe(vp) == DESTROY_ME) {
3362         /* Only check destroyMe if we are the fileserver, since the
3363          * volserver et al sometimes need to work with volumes with
3364          * destroyMe set. Examples are 'temporary' volumes the
3365          * volserver creates, and when we create a volume (destroyMe
3366          * is set on creation; sometimes a separate volserver
3367          * transaction is created to clear destroyMe).
3368          */
3369
3370 #if defined(AFS_DEMAND_ATTACH_FS)
3371         /* schedule a salvage so the volume goes away on disk */
3372         VRequestSalvage_r(ec, vp, SALVSYNC_ERROR, VOL_SALVAGE_NO_OFFLINE);
3373         VChangeState_r(vp, VOL_STATE_ERROR);
3374         vp->nUsers = 0;
3375         forcefree = 1;
3376 #endif /* AFS_DEMAND_ATTACH_FS */
3377         Log("VAttachVolume: volume %s is junk; it should be destroyed at next salvage\n", path);
3378         *ec = VNOVOL;
3379         goto locked_error;
3380     }
3381
3382     vp->vnodeIndex[vSmall].bitmap = vp->vnodeIndex[vLarge].bitmap = NULL;
3383 #ifndef BITMAP_LATER
3384     if (programType == fileServer && VolumeWriteable(vp)) {
3385         int i;
3386         for (i = 0; i < nVNODECLASSES; i++) {
3387             VGetBitmap_r(ec, vp, i);
3388             if (*ec) {
3389 #ifdef AFS_DEMAND_ATTACH_FS
3390                 VRequestSalvage_r(ec, vp, SALVSYNC_ERROR, VOL_SALVAGE_NO_OFFLINE);
3391                 vp->nUsers = 0;
3392 #endif /* AFS_DEMAND_ATTACH_FS */
3393                 Log("VAttachVolume: error getting bitmap for volume (%s)\n",
3394                     path);
3395                 goto locked_error;
3396             }
3397         }
3398     }
3399 #endif /* BITMAP_LATER */
3400
3401     if (VInit >= 2 && V_needsCallback(vp)) {
3402         if (V_BreakVolumeCallbacks) {
3403             Log("VAttachVolume: Volume %lu was changed externally; breaking callbacks\n",
3404                 afs_printable_uint32_lu(V_id(vp)));
3405             V_needsCallback(vp) = 0;
3406             VOL_UNLOCK;
3407             (*V_BreakVolumeCallbacks) (V_id(vp));
3408             VOL_LOCK;
3409
3410             VUpdateVolume_r(ec, vp, 0);
3411         }
3412 #ifdef FSSYNC_BUILD_CLIENT
3413         else if (VCanUseFSSYNC()) {
3414             afs_int32 fsync_code;
3415
3416             V_needsCallback(vp) = 0;
3417             VOL_UNLOCK;
3418             fsync_code = FSYNC_VolOp(V_id(vp), NULL, FSYNC_VOL_BREAKCBKS, FSYNC_WHATEVER, NULL);
3419             VOL_LOCK;
3420
3421             if (fsync_code) {
3422                 V_needsCallback(vp) = 1;
3423                 Log("Error trying to tell the fileserver to break callbacks for "
3424                     "changed volume %lu; error code %ld\n",
3425                     afs_printable_uint32_lu(V_id(vp)),
3426                     afs_printable_int32_ld(fsync_code));
3427             } else {
3428                 VUpdateVolume_r(ec, vp, 0);
3429             }
3430         }
3431 #endif /* FSSYNC_BUILD_CLIENT */
3432
3433         if (*ec) {
3434             Log("VAttachVolume: error %d clearing needsCallback on volume "
3435                 "%lu; needs salvage\n", (int)*ec,
3436                 afs_printable_uint32_lu(V_id(vp)));
3437 #ifdef AFS_DEMAND_ATTACH_FS
3438             VRequestSalvage_r(ec, vp, SALVSYNC_ERROR, VOL_SALVAGE_NO_OFFLINE);
3439             vp->nUsers = 0;
3440 #else /* !AFS_DEMAND_ATTACH_FS */
3441             *ec = VSALVAGE;
3442 #endif /* !AFS_DEMAND_ATTACh_FS */
3443             goto locked_error;
3444         }
3445     }
3446
3447     if (programType == fileServer) {
3448         if (vp->specialStatus)
3449             vp->specialStatus = 0;
3450         if (V_blessed(vp) && V_inService(vp) && !V_needsSalvaged(vp)) {
3451             V_inUse(vp) = fileServer;
3452             V_offlineMessage(vp)[0] = '\0';
3453         }
3454 #ifdef AFS_DEMAND_ATTACH_FS
3455         /* check if the volume is actually usable. only do this for DAFS; for
3456          * non-DAFS, volumes that are not inService/blessed can still be
3457          * attached, even if clients cannot access them. this is relevant
3458          * because for non-DAFS, we try to attach the volume when e.g.
3459          * volserver gives us back then vol when its done with it, but
3460          * volserver may give us back a volume that is not inService/blessed. */
3461
3462         if (!V_inUse(vp)) {
3463             *ec = VNOVOL;
3464             /* Put the vol into PREATTACHED state, so if someone tries to
3465              * access it again, we try to attach, see that we're not blessed,
3466              * and give a VNOVOL error again. Putting it into UNATTACHED state
3467              * would result in a VOFFLINE error instead. */
3468             error_state = VOL_STATE_PREATTACHED;
3469
3470             /* mimic e.g. GetVolume errors */
3471             if (!V_blessed(vp)) {
3472                 Log("Volume %lu offline: not blessed\n", afs_printable_uint32_lu(V_id(vp)));
3473                 FreeVolumeHeader(vp);
3474             } else if (!V_inService(vp)) {
3475                 Log("Volume %lu offline: not in service\n", afs_printable_uint32_lu(V_id(vp)));
3476                 FreeVolumeHeader(vp);
3477             } else {
3478                 Log("Volume %lu offline: needs salvage\n", afs_printable_uint32_lu(V_id(vp)));
3479                 *ec = VSALVAGE;
3480                 error_state = VOL_STATE_ERROR;
3481                 /* see if we can recover */
3482                 VRequestSalvage_r(ec, vp, SALVSYNC_NEEDED, VOL_SALVAGE_NO_OFFLINE);
3483             }
3484             vp->nUsers = 0;
3485             goto locked_error;
3486         }
3487 #endif /* AFS_DEMAND_ATTACH_FS */
3488     } else {
3489 #ifdef AFS_DEMAND_ATTACH_FS
3490         if ((mode != V_PEEK) && (mode != V_SECRETLY) && (mode != V_READONLY))
3491             V_inUse(vp) = programType;
3492 #endif /* AFS_DEMAND_ATTACH_FS */
3493         V_checkoutMode(vp) = mode;
3494     }
3495
3496     AddVolumeToHashTable(vp, vp->hashid);
3497 #ifdef AFS_DEMAND_ATTACH_FS
3498     if (VCanUnlockAttached() && (V_attachFlags(vp) & VOL_LOCKED)) {
3499         VUnlockVolume(vp);
3500     }
3501     if ((programType != fileServer) ||
3502         (V_inUse(vp) == fileServer)) {
3503         AddVolumeToVByPList_r(vp);
3504         VLRU_Add_r(vp);
3505         VChangeState_r(vp, VOL_STATE_ATTACHED);
3506     } else {
3507         VChangeState_r(vp, VOL_STATE_UNATTACHED);
3508     }
3509 #endif
3510
3511     return vp;
3512
3513 unlocked_error:
3514     VOL_LOCK;
3515 locked_error:
3516 #ifdef AFS_DEMAND_ATTACH_FS
3517     if (!VIsErrorState(V_attachState(vp))) {
3518         if (programType != fileServer && *ec == VNOVOL) {
3519             /* do not log anything in this case; it is common for
3520              * non-fileserver programs to fail here with VNOVOL, since that
3521              * is what happens when they simply try to use a volume, but that
3522              * volume doesn't exist. */
3523
3524         } else if (VIsErrorState(error_state)) {
3525             Log("attach2: forcing vol %" AFS_VOLID_FMT " to error state (state %u flags 0x%x ec %d)\n",
3526                 afs_printable_VolumeId_lu(vp->hashid), V_attachState(vp),
3527                 V_attachFlags(vp), *ec);
3528         }
3529         VChangeState_r(vp, error_state);
3530     }
3531 #endif /* AFS_DEMAND_ATTACH_FS */
3532
3533     if (read_header) {
3534         VReleaseVolumeHandles_r(vp);
3535     }
3536
3537 #ifdef AFS_DEMAND_ATTACH_FS
3538  error_notbroken:
3539     if (VCheckSalvage(vp) == VCHECK_SALVAGE_FAIL) {
3540         /* The salvage could not be scheduled with the salvage server
3541          * due to a hard error. Reset the error code to prevent retry loops by
3542          * callers. */
3543         if (*ec == VSALVAGING) {
3544             *ec = VSALVAGE;
3545         }
3546     }
3547     if (forcefree) {
3548         FreeVolume(vp);
3549     } else {
3550         VCheckFree(vp);
3551     }
3552 #else /* !AFS_DEMAND_ATTACH_FS */
3553     FreeVolume(vp);
3554 #endif /* !AFS_DEMAND_ATTACH_FS */
3555     return NULL;
3556 }
3557
3558 /* Attach an existing volume.
3559    The volume also normally goes online at this time.
3560    An offline volume must be reattached to make it go online.
3561  */
3562
3563 Volume *
3564 VAttachVolume(Error * ec, VolumeId volumeId, int mode)
3565 {
3566     Volume *retVal;
3567     VOL_LOCK;
3568     retVal = VAttachVolume_r(ec, volumeId, mode);
3569     VOL_UNLOCK;
3570     return retVal;
3571 }
3572
3573 Volume *
3574 VAttachVolume_r(Error * ec, VolumeId volumeId, int mode)
3575 {
3576     char *part, *name;
3577     VGetVolumePath(ec, volumeId, &part, &name);
3578     if (*ec) {
3579         Volume *vp;
3580         Error error;
3581         vp = VGetVolume_r(&error, volumeId);
3582         if (vp) {
3583             opr_Assert(V_inUse(vp) == 0);
3584             VDetachVolume_r(ec, vp);
3585         }
3586         return NULL;
3587     }
3588     return VAttachVolumeByName_r(ec, part, name, mode);
3589 }
3590
3591 /* Increment a reference count to a volume, sans context swaps.  Requires
3592  * possibly reading the volume header in from the disk, since there's
3593  * an invariant in the volume package that nUsers>0 ==> vp->header is valid.
3594  *
3595  * N.B. This call can fail if we can't read in the header!!  In this case
3596  * we still guarantee we won't context swap, but the ref count won't be
3597  * incremented (otherwise we'd violate the invariant).
3598  */
3599 /* NOTE: with the demand attach fileserver extensions, the global lock
3600  * is dropped within VHold */
3601 #ifdef AFS_DEMAND_ATTACH_FS
3602 static int
3603 VHold_r(Volume * vp)
3604 {
3605     Error error;
3606
3607     VCreateReservation_r(vp);
3608     VWaitExclusiveState_r(vp);
3609
3610     LoadVolumeHeader(&error, vp);
3611     if (error) {
3612         VCancelReservation_r(vp);
3613         return error;
3614     }
3615     vp->nUsers++;
3616     VCancelReservation_r(vp);
3617     return 0;
3618 }
3619 #else /* AFS_DEMAND_ATTACH_FS */
3620 static int
3621 VHold_r(Volume * vp)
3622 {
3623     Error error;
3624
3625     LoadVolumeHeader(&error, vp);
3626     if (error)
3627         return error;
3628     vp->nUsers++;
3629     return 0;
3630 }
3631 #endif /* AFS_DEMAND_ATTACH_FS */
3632
3633 /**** volume timeout-related stuff ****/
3634
3635 #ifdef AFS_PTHREAD_ENV
3636
3637 static struct timespec *shutdown_timeout;
3638 static pthread_once_t shutdown_timeout_once = PTHREAD_ONCE_INIT;
3639
3640 static_inline int
3641 VTimedOut(const struct timespec *ts)
3642 {
3643     struct timeval tv;
3644     int code;
3645
3646     if (ts->tv_sec == 0) {
3647         /* short-circuit; this will have always timed out */
3648         return 1;
3649     }
3650
3651     code = gettimeofday(&tv, NULL);
3652     if (code) {
3653         Log("Error %d from gettimeofday, assuming we have not timed out\n", errno);
3654         /* assume no timeout; failure mode is we just wait longer than normal
3655          * instead of returning errors when we shouldn't */
3656         return 0;
3657     }
3658
3659     if (tv.tv_sec < ts->tv_sec ||
3660         (tv.tv_sec == ts->tv_sec && tv.tv_usec*1000 < ts->tv_nsec)) {
3661
3662         return 0;
3663     }
3664
3665     return 1;
3666 }
3667
3668 /**
3669  * Calculate an absolute timeout.
3670  *
3671  * @param[out] ts  A timeout that is "timeout" seconds from now, if we return
3672  *                 NULL, the memory is not touched
3673  * @param[in]  timeout  How long the timeout should be from now
3674  *
3675  * @return timeout to use
3676  *  @retval NULL      no timeout; wait forever
3677  *  @retval non-NULL  the given value for "ts"
3678  *
3679  * @internal
3680  */
3681 static struct timespec *
3682 VCalcTimeout(struct timespec *ts, afs_int32 timeout)
3683 {
3684     struct timeval now;
3685     int code;
3686
3687     if (timeout < 0) {
3688         return NULL;
3689     }
3690
3691     if (timeout == 0) {
3692         ts->tv_sec = ts->tv_nsec = 0;
3693         return ts;
3694     }
3695
3696     code = gettimeofday(&now, NULL);
3697     if (code) {
3698         Log("Error %d from gettimeofday, falling back to 'forever' timeout\n", errno);
3699         return NULL;
3700     }
3701
3702     ts->tv_sec = now.tv_sec + timeout;
3703     ts->tv_nsec = now.tv_usec * 1000;
3704
3705     return ts;
3706 }
3707
3708 /**
3709  * Initialize the shutdown_timeout global.
3710  */
3711 static void
3712 VShutdownTimeoutInit(void)
3713 {
3714     struct timespec *ts;
3715
3716     ts = malloc(sizeof(*ts));
3717
3718     shutdown_timeout = VCalcTimeout(ts, vol_opts.offline_shutdown_timeout);
3719
3720     if (!shutdown_timeout) {
3721         free(ts);
3722     }
3723 }
3724
3725 /**
3726  * Figure out the timeout that should be used for waiting for offline volumes.
3727  *
3728  * @param[out] ats  Storage space for a local timeout value if needed
3729  *
3730  * @return The timeout value that should be used
3731  *   @retval NULL      No timeout; wait forever for offlining volumes
3732  *   @retval non-NULL  A pointer to the absolute time that should be used as
3733  *                     the deadline for waiting for offlining volumes.
3734  *
3735  * @note If we return non-NULL, the pointer we return may or may not be the
3736  *       same as "ats"
3737  */
3738 static const struct timespec *
3739 VOfflineTimeout(struct timespec *ats)
3740 {
3741     if (vol_shutting_down) {
3742         opr_Verify(pthread_once(&shutdown_timeout_once,
3743                                 VShutdownTimeoutInit) == 0);
3744         return shutdown_timeout;
3745     } else {
3746         return VCalcTimeout(ats, vol_opts.offline_timeout);
3747     }
3748 }
3749
3750 #else /* AFS_PTHREAD_ENV */
3751
3752 /* Waiting a certain amount of time for offlining volumes is not supported
3753  * for LWP due to a lack of primitives. So, we never time out */
3754 # define VTimedOut(x) (0)
3755 # define VOfflineTimeout(x) (NULL)
3756
3757 #endif /* !AFS_PTHREAD_ENV */
3758
3759 #if 0
3760 static int
3761 VHold(Volume * vp)
3762 {
3763     int retVal;
3764     VOL_LOCK;
3765     retVal = VHold_r(vp);
3766     VOL_UNLOCK;
3767     return retVal;
3768 }
3769 #endif
3770
3771 static afs_int32
3772 VIsGoingOffline_r(struct Volume *vp)
3773 {
3774     afs_int32 code = 0;
3775
3776     if (vp->goingOffline) {
3777         if (vp->specialStatus) {
3778             code = vp->specialStatus;
3779         } else if (V_inService(vp) == 0 || V_blessed(vp) == 0) {
3780             code = VNOVOL;
3781         } else {
3782             code = VOFFLINE;
3783         }
3784     }
3785
3786     return code;
3787 }
3788
3789 /**
3790  * Tell the caller if a volume is waiting to go offline.
3791  *
3792  * @param[in] vp  The volume we want to know about
3793  *
3794  * @return volume status
3795  *   @retval 0 volume is not waiting to go offline, go ahead and use it
3796  *   @retval nonzero volume is waiting to offline, and give the returned code
3797  *           as an error to anyone accessing the volume
3798  *
3799  * @pre VOL_LOCK is NOT held
3800  * @pre caller holds a heavyweight reference on vp
3801  */
3802 afs_int32
3803 VIsGoingOffline(struct Volume *vp)
3804 {
3805     afs_int32 code;
3806
3807     VOL_LOCK;
3808     code = VIsGoingOffline_r(vp);
3809     VOL_UNLOCK;
3810
3811     return code;
3812 }
3813
3814 /**
3815  * Register an RX call with a volume.
3816  *
3817  * @param[inout] ec        Error code; if unset when passed in, may be set if
3818  *                         the volume starts going offline
3819  * @param[out]   client_ec @see GetVolume
3820  * @param[in] vp   Volume struct
3821  * @param[in] cbv  VCallByVol struct containing the RX call to register
3822  *
3823  * @pre VOL_LOCK held
3824  * @pre caller holds heavy ref on vp
3825  *
3826  * @internal
3827  */
3828 static void
3829 VRegisterCall_r(Error *ec, Error *client_ec, Volume *vp, struct VCallByVol *cbv)
3830 {
3831     if (vp && cbv) {
3832 #ifdef AFS_DEMAND_ATTACH_FS
3833         if (!*ec) {
3834             /* just in case the volume started going offline after we got the
3835              * reference to it... otherwise, if the volume started going
3836              * offline right at the end of GetVolume(), we might race with the
3837              * RX call scanner, and return success and add our cbv to the
3838              * rx_call_list _after_ the scanner has scanned the list. */
3839             *ec = VIsGoingOffline_r(vp);
3840             if (client_ec) {
3841                 *client_ec = *ec;
3842             }
3843         }
3844
3845         while (V_attachState(vp) == VOL_STATE_SCANNING_RXCALLS) {
3846             VWaitStateChange_r(vp);
3847         }
3848 #endif /* AFS_DEMAND_ATTACH_FS */
3849
3850         queue_Prepend(&vp->rx_call_list, cbv);
3851     }
3852 }
3853
3854 /**
3855  * Deregister an RX call with a volume.
3856  *
3857  * @param[in] vp   Volume struct
3858  * @param[in] cbv  VCallByVol struct containing the RX call to deregister
3859  *
3860  * @pre VOL_LOCK held
3861  * @pre caller holds heavy ref on vp
3862  *
3863  * @internal
3864  */
3865 static void
3866 VDeregisterCall_r(Volume *vp, struct VCallByVol *cbv)
3867 {
3868     if (cbv && queue_IsOnQueue(cbv)) {
3869 #ifdef AFS_DEMAND_ATTACH_FS
3870         while (V_attachState(vp) == VOL_STATE_SCANNING_RXCALLS) {
3871             VWaitStateChange_r(vp);
3872         }
3873 #endif /* AFS_DEMAND_ATTACH_FS */
3874
3875         queue_Remove(cbv);
3876     }
3877 }
3878
3879 /***************************************************/
3880 /* get and put volume routines                     */
3881 /***************************************************/
3882
3883 /**
3884  * put back a heavyweight reference to a volume object.
3885  *
3886  * @param[in] vp  volume object pointer
3887  *
3888  * @pre VOL_LOCK held
3889  *
3890  * @post heavyweight volume reference put back.
3891  *       depending on state, volume may have been taken offline,
3892  *       detached, salvaged, freed, etc.
3893  *
3894  * @internal volume package internal use only
3895  */
3896 void
3897 VPutVolume_r(Volume * vp)
3898 {
3899     opr_Verify(--vp->nUsers >= 0);
3900     if (vp->nUsers == 0) {
3901         VCheckOffline(vp);
3902         ReleaseVolumeHeader(vp->header);
3903 #ifdef AFS_DEMAND_ATTACH_FS
3904         if (!VCheckDetach(vp)) {
3905             VCheckSalvage(vp);
3906             VCheckFree(vp);
3907         }
3908 #else /* AFS_DEMAND_ATTACH_FS */
3909         VCheckDetach(vp);
3910 #endif /* AFS_DEMAND_ATTACH_FS */
3911     }
3912 }
3913
3914 void
3915 VPutVolume(Volume * vp)
3916 {
3917     VOL_LOCK;
3918     VPutVolume_r(vp);
3919     VOL_UNLOCK;
3920 }
3921
3922 /**
3923  * Puts a volume reference obtained with VGetVolumeWithCall.
3924  *
3925  * @param[in] vp  Volume struct
3926  * @param[in] cbv VCallByVol struct given to VGetVolumeWithCall, or NULL if none
3927  *
3928  * @pre VOL_LOCK is NOT held
3929  */
3930 void
3931 VPutVolumeWithCall(Volume *vp, struct VCallByVol *cbv)
3932 {
3933     VOL_LOCK;
3934     VDeregisterCall_r(vp, cbv);
3935     VPutVolume_r(vp);
3936     VOL_UNLOCK;
3937 }
3938
3939 /* Get a pointer to an attached volume.  The pointer is returned regardless
3940    of whether or not the volume is in service or on/off line.  An error
3941    code, however, is returned with an indication of the volume's status */
3942 Volume *
3943 VGetVolume(Error * ec, Error * client_ec, VolumeId volumeId)
3944 {
3945     Volume *retVal;
3946     VOL_LOCK;
3947     retVal = GetVolume(ec, client_ec, volumeId, NULL, 0);
3948     VOL_UNLOCK;
3949     return retVal;
3950 }
3951
3952 /**
3953  * Get a volume reference associated with an RX call.
3954  *
3955  * @param[out] ec @see GetVolume
3956  * @param[out] client_ec @see GetVolume
3957  * @param[in] volumeId @see GetVolume
3958  * @param[in] ts  How long to wait for going-offline volumes (absolute time).
3959  *                If NULL, wait forever. If ts->tv_sec == 0, return immediately
3960  *                with an error if the volume is going offline.
3961  * @param[in] cbv Contains an RX call to be associated with this volume
3962  *                reference. This call may be interrupted if the volume is
3963  *                requested to go offline while we hold a ref on it. Give NULL
3964  *                to not associate an RX call with this reference.
3965  *
3966  * @return @see GetVolume
3967  *
3968  * @note for LWP builds, ts must be NULL
3969  *
3970  * @note A reference obtained with this function MUST be put back with
3971  *       VPutVolumeWithCall
3972  */
3973 Volume *
3974 VGetVolumeWithCall(Error * ec, Error * client_ec, VolumeId volumeId,
3975                    const struct timespec *ts, struct VCallByVol *cbv)
3976 {
3977     Volume *retVal;
3978     VOL_LOCK;
3979     retVal = GetVolume(ec, client_ec, volumeId, NULL, ts);
3980     VRegisterCall_r(ec, client_ec, retVal, cbv);
3981     VOL_UNLOCK;
3982     return retVal;
3983 }
3984
3985 Volume *
3986 VGetVolume_r(Error * ec, VolumeId volumeId)
3987 {
3988     return GetVolume(ec, NULL, volumeId, NULL, NULL);
3989 }
3990
3991 /* try to get a volume we've previously looked up */
3992 /* for demand attach fs, caller MUST NOT hold a ref count on vp */
3993 Volume *
3994 VGetVolumeByVp_r(Error * ec, Volume * vp)
3995 {
3996     return GetVolume(ec, NULL, vp->hashid, vp, NULL);
3997 }
3998
3999 /**
4000  * private interface for getting a volume handle
4001  *
4002  * @param[out] ec         error code (0 if no error)
4003  * @param[out] client_ec  wire error code to be given to clients
4004  * @param[in]  volumeId   ID of the volume we want
4005  * @param[in]  hint       optional hint for hash lookups, or NULL
4006  * @param[in]  timeout    absolute deadline for waiting for the volume to go
4007  *                        offline, if it is going offline. NULL to wait forever.
4008  *
4009  * @return a volume handle for the specified volume
4010  *  @retval NULL an error occurred, or the volume is in such a state that
4011  *               we cannot load a header or return any volume struct
4012  *
4013  * @note for DAFS, caller must NOT hold a ref count on 'hint'
4014  *
4015  * @note 'timeout' is only checked if the volume is actually going offline; so
4016  *       if you pass timeout->tv_sec = 0, this will exhibit typical
4017  *       nonblocking behavior.
4018  *
4019  * @note for LWP builds, 'timeout' must be NULL
4020  */
4021 static Volume *
4022 GetVolume(Error * ec, Error * client_ec, VolumeId volumeId, Volume * hint,
4023           const struct timespec *timeout)
4024 {
4025     Volume *vp = hint;
4026     /* pull this profiling/debugging code out of regular builds */
4027 #ifdef notdef
4028 #define VGET_CTR_INC(x) x++
4029     unsigned short V0 = 0, V1 = 0, V2 = 0, V3 = 0, V5 = 0, V6 =
4030         0, V7 = 0, V8 = 0, V9 = 0;
4031     unsigned short V10 = 0, V11 = 0, V12 = 0, V13 = 0, V14 = 0, V15 = 0;
4032 #else
4033 #define VGET_CTR_INC(x)
4034 #endif
4035 #ifdef AFS_DEMAND_ATTACH_FS
4036     Volume *avp, * rvp = hint;
4037 #endif
4038
4039     /*
4040      * if VInit is zero, the volume package dynamic
4041      * data structures have not been initialized yet,
4042      * and we must immediately return an error
4043      */
4044     if (VInit == 0) {
4045         vp = NULL;
4046         *ec = VOFFLINE;
4047         if (client_ec) {
4048             *client_ec = VOFFLINE;
4049         }
4050         goto not_inited;
4051     }
4052
4053 #ifdef AFS_DEMAND_ATTACH_FS
4054     if (rvp) {
4055         VCreateReservation_r(rvp);
4056     }
4057 #endif /* AFS_DEMAND_ATTACH_FS */
4058
4059     for (;;) {
4060         *ec = 0;
4061         if (client_ec)
4062             *client_ec = 0;
4063         VGET_CTR_INC(V0);
4064
4065         vp = VLookupVolume_r(ec, volumeId, vp);
4066         if (*ec) {
4067             vp = NULL;
4068             break;
4069         }
4070
4071 #ifdef AFS_DEMAND_ATTACH_FS
4072         if (rvp && (rvp != vp)) {
4073             /* break reservation on old vp */
4074             VCancelReservation_r(rvp);
4075             rvp = NULL;
4076         }
4077 #endif /* AFS_DEMAND_ATTACH_FS */
4078
4079         if (!vp) {
4080             VGET_CTR_INC(V1);
4081             if (VInit < 2) {
4082                 VGET_CTR_INC(V2);
4083                 /* Until we have reached an initialization level of 2
4084                  * we don't know whether this volume exists or not.
4085                  * We can't sleep and retry later because before a volume
4086                  * is attached, the caller tries to get it first.  Just
4087                  * return VOFFLINE and the caller can choose whether to
4088                  * retry the command or not. */
4089                 *ec = VOFFLINE;
4090                 break;
4091             }
4092
4093             *ec = VNOVOL;
4094             break;
4095         }
4096
4097         VGET_CTR_INC(V3);
4098         IncUInt64(&VStats.hdr_gets);
4099
4100 #ifdef AFS_DEMAND_ATTACH_FS
4101         /* block if someone else is performing an exclusive op on this volume */
4102         if (rvp != vp) {
4103             rvp = vp;
4104             VCreateReservation_r(rvp);
4105         }
4106         VWaitExclusiveState_r(vp);
4107
4108         /* short circuit with VNOVOL in the following circumstances:
4109          *
4110          *   - VOL_STATE_ERROR
4111          *   - VOL_STATE_SHUTTING_DOWN
4112          */
4113         if ((V_attachState(vp) == VOL_STATE_ERROR) ||
4114             (V_attachState(vp) == VOL_STATE_SHUTTING_DOWN)) {
4115             *ec = VNOVOL;
4116             vp = NULL;
4117             break;
4118         }
4119
4120         /*
4121          * short circuit with VOFFLINE for VOL_STATE_UNATTACHED/GOING_OFFLINE and
4122          *                    VNOVOL   for VOL_STATE_DELETED
4123          */
4124        if ((V_attachState(vp) == VOL_STATE_UNATTACHED) ||
4125            (V_attachState(vp) == VOL_STATE_GOING_OFFLINE) ||
4126            (V_attachState(vp) == VOL_STATE_DELETED)) {
4127            if (vp->specialStatus) {
4128                *ec = vp->specialStatus;
4129            } else if (V_attachState(vp) == VOL_STATE_DELETED) {
4130                *ec = VNOVOL;
4131            } else {
4132                *ec = VOFFLINE;
4133            }
4134            vp = NULL;
4135            break;
4136        }
4137
4138         /* allowable states:
4139          *   - PREATTACHED
4140          *   - ATTACHED
4141          *   - SALVAGING
4142          *   - SALVAGE_REQ
4143          */
4144
4145         if (vp->salvage.requested) {
4146             VUpdateSalvagePriority_r(vp);
4147         }
4148
4149         if (V_attachState(vp) == VOL_STATE_PREATTACHED) {
4150             if (vp->specialStatus) {
4151                 *ec = vp->specialStatus;
4152                 vp = NULL;
4153                 break;
4154             }
4155             avp = VAttachVolumeByVp_r(ec, vp, 0);
4156             if (avp) {
4157                 if (vp != avp) {
4158                     /* VAttachVolumeByVp_r can return a pointer
4159                      * != the vp passed to it under certain
4160                      * conditions; make sure we don't leak
4161                      * reservations if that happens */
4162                     vp = avp;
4163                     VCancelReservation_r(rvp);
4164                     rvp = avp;
4165                     VCreateReservation_r(rvp);
4166                 }
4167                 VPutVolume_r(avp);
4168             }
4169             if (*ec) {
4170                 int endloop = 0;
4171                 switch (*ec) {
4172                 case VSALVAGING:
4173                     break;
4174                 case VOFFLINE:
4175                     endloop = 1;
4176                     if (vp->specialStatus) {
4177                         *ec = vp->specialStatus;
4178                     }
4179                     break;
4180
4181                 default:
4182                     if (vp->specialStatus) {
4183                         *ec = vp->specialStatus;
4184                     } else {
4185                         *ec = VNOVOL;
4186                     }
4187                     endloop = 1;
4188                 }
4189                 if (endloop) {
4190                     vp = NULL;
4191                     break;
4192                 }
4193             }
4194         }
4195
4196         if (VIsSalvaging(vp) || (*ec == VSALVAGING)) {
4197             if (client_ec) {
4198                 /* see CheckVnode() in afsfileprocs.c for an explanation
4199                  * of this error code logic */
4200                 afs_uint32 now = FT_ApproxTime();
4201                 if ((vp->stats.last_salvage + (10 * 60)) >= now) {
4202                     *client_ec = VBUSY;
4203                 } else {
4204                     *client_ec = VRESTARTING;
4205                 }
4206             }
4207             *ec = VSALVAGING;
4208             vp = NULL;
4209             break;
4210         }
4211
4212         if (VIsErrorState(V_attachState(vp))) {
4213             /* make sure we don't take a vp in VOL_STATE_ERROR state and use
4214              * it, or transition it out of that state */
4215             if (!*ec) {
4216                 *ec = VNOVOL;
4217             }
4218             vp = NULL;
4219             break;
4220         }
4221
4222         /*
4223          * this test MUST happen after VAttachVolymeByVp, so we have no
4224          * conflicting vol op. (attach2 would have errored out if we had one;
4225          * specifically attach_check_vop must have detected a conflicting vop)
4226          */
4227          opr_Assert(!vp->pending_vol_op || vp->pending_vol_op->vol_op_state == FSSYNC_VolOpRunningOnline);
4228
4229 #endif /* AFS_DEMAND_ATTACH_FS */
4230
4231         LoadVolumeHeader(ec, vp);
4232         if (*ec) {
4233             VGET_CTR_INC(V6);
4234             /* Only log the error if it was a totally unexpected error.  Simply
4235              * a missing inode is likely to be caused by the volume being deleted */
4236             if (errno != ENXIO || LogLevel)
4237                 Log("Volume %" AFS_VOLID_FMT ": couldn't reread volume header\n",
4238                     afs_printable_VolumeId_lu(vp->hashid));
4239 #ifdef AFS_DEMAND_ATTACH_FS
4240             if (VCanScheduleSalvage()) {
4241                 VRequestSalvage_r(ec, vp, SALVSYNC_ERROR, 0 /*flags*/);
4242             } else {
4243                 FreeVolume(vp);
4244                 vp = NULL;
4245             }
4246 #else /* AFS_DEMAND_ATTACH_FS */
4247             FreeVolume(vp);
4248             vp = NULL;
4249 #endif /* AFS_DEMAND_ATTACH_FS */
4250             break;
4251         }
4252
4253         VGET_CTR_INC(V7);
4254         if (vp->shuttingDown) {
4255             VGET_CTR_INC(V8);
4256             *ec = VNOVOL;
4257             vp = NULL;
4258             break;
4259         }
4260
4261         if (programType == fileServer) {
4262             VGET_CTR_INC(V9);
4263             if (vp->goingOffline) {
4264                 if (timeout && VTimedOut(timeout)) {
4265                     /* we've timed out; don't wait for the vol */
4266                 } else {
4267                     VGET_CTR_INC(V10);
4268 #ifdef AFS_DEMAND_ATTACH_FS
4269                     /* wait for the volume to go offline */
4270                     if (V_attachState(vp) == VOL_STATE_GOING_OFFLINE) {
4271                         VTimedWaitStateChange_r(vp, timeout, NULL);
4272                     }
4273 #elif defined(AFS_PTHREAD_ENV)
4274                     VOL_CV_TIMEDWAIT(&vol_put_volume_cond, timeout, NULL);
4275 #else /* AFS_PTHREAD_ENV */
4276                     /* LWP has no timed wait, so the caller better not be
4277                      * expecting one */
4278                     opr_Assert(!timeout);
4279                     LWP_WaitProcess(VPutVolume);
4280 #endif /* AFS_PTHREAD_ENV */
4281                     continue;
4282                 }
4283             }
4284             if (vp->specialStatus) {
4285                 VGET_CTR_INC(V11);
4286                 *ec = vp->specialStatus;
4287             } else if (V_inService(vp) == 0 || V_blessed(vp) == 0) {
4288                 VGET_CTR_INC(V12);
4289                 *ec = VNOVOL;
4290             } else if (V_inUse(vp) == 0 || vp->goingOffline) {
4291                 VGET_CTR_INC(V13);
4292                 *ec = VOFFLINE;
4293             } else {
4294                 VGET_CTR_INC(V14);
4295             }
4296         }
4297         break;
4298     }
4299     VGET_CTR_INC(V15);
4300
4301 #ifdef AFS_DEMAND_ATTACH_FS
4302     /* if no error, bump nUsers */
4303     if (vp) {
4304         vp->nUsers++;
4305         VLRU_UpdateAccess_r(vp);
4306     }
4307     if (rvp) {
4308         VCancelReservation_r(rvp);
4309         rvp = NULL;
4310     }
4311     if (client_ec && !*client_ec) {
4312         *client_ec = *ec;
4313     }
4314 #else /* AFS_DEMAND_ATTACH_FS */
4315     /* if no error, bump nUsers */
4316     if (vp) {
4317         vp->nUsers++;
4318     }
4319     if (client_ec) {
4320         *client_ec = *ec;
4321     }
4322 #endif /* AFS_DEMAND_ATTACH_FS */
4323
4324  not_inited:
4325     opr_Assert(vp || *ec);
4326     return vp;
4327 }
4328
4329
4330 /***************************************************/
4331 /* Volume offline/detach routines                  */
4332 /***************************************************/
4333
4334 /* caller MUST hold a heavyweight ref on vp */
4335 #ifdef AFS_DEMAND_ATTACH_FS
4336 void
4337 VTakeOffline_r(Volume * vp)
4338 {
4339     Error error;
4340
4341     opr_Assert(vp->nUsers > 0);
4342     opr_Assert(programType == fileServer);
4343
4344     VCreateReservation_r(vp);
4345     VWaitExclusiveState_r(vp);
4346
4347     vp->goingOffline = 1;
4348     V_needsSalvaged(vp) = 1;
4349
4350     VRequestSalvage_r(&error, vp, SALVSYNC_ERROR, 0);
4351     VCancelReservation_r(vp);
4352 }
4353 #else /* AFS_DEMAND_ATTACH_FS */
4354 void
4355 VTakeOffline_r(Volume * vp)
4356 {
4357     opr_Assert(vp->nUsers > 0);
4358     opr_Assert(programType == fileServer);
4359
4360     vp->goingOffline = 1;
4361     V_needsSalvaged(vp) = 1;
4362 }
4363 #endif /* AFS_DEMAND_ATTACH_FS */
4364
4365 void
4366 VTakeOffline(Volume * vp)
4367 {
4368     VOL_LOCK;
4369     VTakeOffline_r(vp);
4370     VOL_UNLOCK;
4371 }
4372
4373 /**
4374  * force a volume offline.
4375  *
4376  * @param[in] vp     volume object pointer
4377  * @param[in] flags  flags (see note below)
4378  *
4379  * @note the flag VOL_FORCEOFF_NOUPDATE is a recursion control flag
4380  *       used when VUpdateVolume_r needs to call VForceOffline_r
4381  *       (which in turn would normally call VUpdateVolume_r)
4382  *
4383  * @see VUpdateVolume_r
4384  *
4385  * @pre VOL_LOCK must be held.
4386  *      for DAFS, caller must hold ref.
4387  *
4388  * @note for DAFS, it _is safe_ to call this function from an
4389  *       exclusive state
4390  *
4391  * @post needsSalvaged flag is set.
4392  *       for DAFS, salvage is requested.
4393  *       no further references to the volume through the volume
4394  *       package will be honored.
4395  *       all file descriptor and vnode caches are invalidated.
4396  *
4397  * @warning this is a heavy-handed interface.  it results in
4398  *          a volume going offline regardless of the current
4399  *          reference count state.
4400  *
4401  * @internal  volume package internal use only
4402  */
4403 void
4404 VForceOffline_r(Volume * vp, int flags)
4405 {
4406     Error error;
4407     if (!V_inUse(vp)) {
4408 #ifdef AFS_DEMAND_ATTACH_FS
4409         VChangeState_r(vp, VOL_STATE_ERROR);
4410 #endif
4411         return;
4412     }
4413
4414     strcpy(V_offlineMessage(vp),
4415            "Forced offline due to internal error: volume needs to be salvaged");
4416     Log("Volume %" AFS_VOLID_FMT " forced offline:  it needs salvaging!\n", afs_printable_VolumeId_lu(V_id(vp)));
4417
4418     V_inUse(vp) = 0;
4419     vp->goingOffline = 0;
4420     V_needsSalvaged(vp) = 1;
4421     if (!(flags & VOL_FORCEOFF_NOUPDATE)) {
4422         VUpdateVolume_r(&error, vp, VOL_UPDATE_NOFORCEOFF);
4423     }
4424
4425 #ifdef AFS_DEMAND_ATTACH_FS
4426     VRequestSalvage_r(&error, vp, SALVSYNC_ERROR, 0 /*flags*/);
4427 #endif /* AFS_DEMAND_ATTACH_FS */
4428
4429 #ifdef AFS_PTHREAD_ENV
4430     opr_cv_broadcast(&vol_put_volume_cond);
4431 #else /* AFS_PTHREAD_ENV */
4432     LWP_NoYieldSignal(VPutVolume);
4433 #endif /* AFS_PTHREAD_ENV */
4434
4435     VReleaseVolumeHandles_r(vp);
4436 }
4437
4438 /**
4439  * force a volume offline.
4440  *
4441  * @param[in] vp  volume object pointer
4442  *
4443  * @see VForceOffline_r
4444  */
4445 void
4446 VForceOffline(Volume * vp)
4447 {
4448     VOL_LOCK;
4449     VForceOffline_r(vp, 0);
4450     VOL_UNLOCK;
4451 }
4452
4453 /**
4454  * Iterate over the RX calls associated with a volume, and interrupt them.
4455  *
4456  * @param[in] vp The volume whose RX calls we want to scan
4457  *
4458  * @pre VOL_LOCK held
4459  */
4460 static void
4461 VScanCalls_r(struct Volume *vp)
4462 {
4463     struct VCallByVol *cbv, *ncbv;
4464     afs_int32 err;
4465 #ifdef AFS_DEMAND_ATTACH_FS
4466     VolState state_save;
4467 #endif
4468
4469     if (queue_IsEmpty(&vp->rx_call_list))
4470         return; /* no calls to interrupt */
4471     if (!vol_opts.interrupt_rxcall)
4472         return; /* we have no function with which to interrupt calls */
4473     err = VIsGoingOffline_r(vp);
4474     if (!err)
4475         return; /* we're not going offline anymore */
4476
4477 #ifdef AFS_DEMAND_ATTACH_FS
4478     VWaitExclusiveState_r(vp);
4479     state_save = VChangeState_r(vp, VOL_STATE_SCANNING_RXCALLS);
4480     VOL_UNLOCK;
4481 #endif /* AFS_DEMAND_ATTACH_FS */
4482
4483     for(queue_Scan(&vp->rx_call_list, cbv, ncbv, VCallByVol)) {
4484         if (LogLevel > 0) {
4485             struct rx_peer *peer;
4486             char hoststr[16];
4487             peer = rx_PeerOf(rx_ConnectionOf(cbv->call));
4488
4489             Log("Offlining volume %" AFS_VOLID_FMT " while client %s:%u is trying to read "
4490                 "from it; kicking client off with error %ld\n",
4491                 afs_printable_VolumeId_lu(vp->hashid),
4492                 afs_inet_ntoa_r(rx_HostOf(peer), hoststr),
4493                 (unsigned) ntohs(rx_PortOf(peer)),
4494                 (long) err);
4495         }
4496         (*vol_opts.interrupt_rxcall) (cbv->call, err);
4497     }
4498
4499 #ifdef AFS_DEMAND_ATTACH_FS
4500     VOL_LOCK;
4501     VChangeState_r(vp, state_save);
4502 #endif /* AFS_DEMAND_ATTACH_FS */
4503 }
4504
4505 #ifdef AFS_DEMAND_ATTACH_FS
4506 /**
4507  * Wait for a vp to go offline.
4508  *
4509  * @param[out] ec 1 if a salvage on the volume has been requested and
4510  *                salvok == 0, 0 otherwise
4511  * @param[in] vp  The volume to wait for
4512  * @param[in] salvok  If 0, we return immediately with *ec = 1 if the volume
4513  *                    has been requested to salvage. Otherwise we keep waiting
4514  *                    until the volume has gone offline.
4515  *
4516  * @pre VOL_LOCK held
4517  * @pre caller holds a lightweight ref on vp