#endif /* !AFS_HAVE_FFS */
#ifdef AFS_PTHREAD_ENV
+/**
+ * disk partition queue element
+ */
typedef struct diskpartition_queue_t {
- struct rx_queue queue;
- struct DiskPartition64 * diskP;
+ struct rx_queue queue; /**< queue header */
+ struct DiskPartition64 *diskP; /**< disk partition table entry */
} diskpartition_queue_t;
+
+#ifndef AFS_DEMAND_ATTACH_FS
+
typedef struct vinitvolumepackage_thread_t {
struct rx_queue queue;
pthread_cond_t thread_done_cv;
int n_threads_complete;
} vinitvolumepackage_thread_t;
static void * VInitVolumePackageThread(void * args);
+
+#else /* !AFS_DEMAND_ATTTACH_FS */
+#define VINIT_BATCH_MAX_SIZE 512
+
+/**
+ * disk partition work queue
+ */
+struct partition_queue {
+ struct rx_queue head; /**< diskpartition_queue_t queue */
+ pthread_mutex_t mutex;
+ pthread_cond_t cv;
+};
+
+/**
+ * volumes parameters for preattach
+ */
+struct volume_init_batch {
+ struct rx_queue queue; /**< queue header */
+ int thread; /**< posting worker thread */
+ int last; /**< indicates thread is done */
+ int size; /**< number of volume ids in batch */
+ Volume *batch[VINIT_BATCH_MAX_SIZE]; /**< volumes ids to preattach */
+};
+
+/**
+ * volume parameters work queue
+ */
+struct volume_init_queue {
+ struct rx_queue head; /**< volume_init_batch queue */
+ pthread_mutex_t mutex;
+ pthread_cond_t cv;
+};
+
+/**
+ * volume init worker thread parameters
+ */
+struct vinitvolumepackage_thread_param {
+ int nthreads; /**< total number of worker threads */
+ int thread; /**< thread number for this worker thread */
+ struct partition_queue *pq; /**< queue partitions to scan */
+ struct volume_init_queue *vq; /**< queue of volume to preattach */
+};
+
+static void *VInitVolumePackageThread(void *args);
+static struct DiskPartition64 *VInitNextPartition(struct partition_queue *pq);
+static VolId VInitNextVolumeId(DIR *dirp);
+static int VInitPreAttachVolumes(int nthreads, struct volume_init_queue *vq);
+
+#endif /* !AFS_DEMAND_ATTACH_FS */
#endif /* AFS_PTHREAD_ENV */
+#ifndef AFS_DEMAND_ATTACH_FS
static int VAttachVolumesByPartition(struct DiskPartition64 *diskP,
int * nAttached, int * nUnattached);
+#endif /* AFS_DEMAND_ATTACH_FS */
#ifdef AFS_DEMAND_ATTACH_FS
return 0;
}
+
+#if !defined(AFS_PTHREAD_ENV)
+/**
+ * Attach volumes in vice partitions
+ *
+ * @param[in] pt calling program type
+ *
+ * @return 0
+ * @note This is the original, non-threaded version of attach parititions.
+ *
+ * @post VInit state is 2
+ */
+int
+VInitAttachVolumes(ProgramType pt)
+{
+ assert(VInit==1);
+ if (pt == fileServer) {
+ struct DiskPartition64 *diskP;
+ /* Attach all the volumes in this partition */
+ for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
+ int nAttached = 0, nUnattached = 0;
+ assert(VAttachVolumesByPartition(diskP, &nAttached, &nUnattached) == 0);
+ }
+ }
+ VOL_LOCK;
+ VInit = 2; /* Initialized, and all volumes have been attached */
+ LWP_NoYieldSignal(VInitAttachVolumes);
+ VOL_UNLOCK;
+ return 0;
+}
+#endif /* !AFS_PTHREAD_ENV */
+
+#if defined(AFS_PTHREAD_ENV) && !defined(AFS_DEMAND_ATTACH_FS)
+/**
+ * Attach volumes in vice partitions
+ *
+ * @param[in] pt calling program type
+ *
+ * @return 0
+ * @note Threaded version of attach parititions.
+ *
+ * @post VInit state is 2
+ */
int
VInitAttachVolumes(ProgramType pt)
{
assert(VInit==1);
if (pt == fileServer) {
struct DiskPartition64 *diskP;
-#ifdef AFS_PTHREAD_ENV
struct vinitvolumepackage_thread_t params;
struct diskpartition_queue_t * dpq;
int i, threads, parts;
assert(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0);
Log("VInitVolumePackage: beginning parallel fileserver startup\n");
-#ifdef AFS_DEMAND_ATTACH_FS
- Log("VInitVolumePackage: using %d threads to pre-attach volumes on %d partitions\n",
- threads, parts);
-#else /* AFS_DEMAND_ATTACH_FS */
Log("VInitVolumePackage: using %d threads to attach volumes on %d partitions\n",
threads, parts);
-#endif /* AFS_DEMAND_ATTACH_FS */
VOL_LOCK;
for (i=0; i < threads; i++) {
/* if we're only going to run one init thread, don't bother creating
* another LWP */
Log("VInitVolumePackage: beginning single-threaded fileserver startup\n");
-#ifdef AFS_DEMAND_ATTACH_FS
- Log("VInitVolumePackage: using 1 thread to pre-attach volumes on %d partition(s)\n",
- parts);
-#else /* AFS_DEMAND_ATTACH_FS */
Log("VInitVolumePackage: using 1 thread to attach volumes on %d partition(s)\n",
parts);
-#endif /* AFS_DEMAND_ATTACH_FS */
VInitVolumePackageThread(¶ms);
}
assert(pthread_cond_destroy(¶ms.thread_done_cv) == 0);
-
-#else /* AFS_PTHREAD_ENV */
-
- /* Attach all the volumes in this partition */
- for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
- int nAttached = 0, nUnattached = 0;
- assert(VAttachVolumesByPartition(diskP, &nAttached, &nUnattached) == 0);
- }
-#endif /* AFS_PTHREAD_ENV */
}
VOL_LOCK;
VInit = 2; /* Initialized, and all volumes have been attached */
-#ifdef AFS_PTHREAD_ENV
assert(pthread_cond_broadcast(&vol_init_attach_cond) == 0);
-#else
- LWP_NoYieldSignal(VInitAttachVolumes);
-#endif /* AFS_PTHREAD_ENV */
VOL_UNLOCK;
return 0;
}
-#ifdef AFS_PTHREAD_ENV
static void *
VInitVolumePackageThread(void * args) {
VOL_UNLOCK;
return NULL;
}
-#endif /* AFS_PTHREAD_ENV */
+#endif /* AFS_PTHREAD_ENV && !AFS_DEMAND_ATTACH_FS */
+#if defined(AFS_DEMAND_ATTACH_FS)
+/**
+ * Attach volumes in vice partitions
+ *
+ * @param[in] pt calling program type
+ *
+ * @return 0
+ * @note Threaded version of attach partitions.
+ *
+ * @post VInit state is 2
+ */
+int
+VInitAttachVolumes(ProgramType pt)
+{
+ assert(VInit==1);
+ if (pt == fileServer) {
+
+ struct DiskPartition64 *diskP;
+ struct partition_queue pq;
+ struct volume_init_queue vq;
+
+ int i, threads, parts;
+ pthread_t tid;
+ pthread_attr_t attrs;
+
+ /* create partition work queue */
+ queue_Init(&pq);
+ assert(pthread_cond_init(&(pq.cv), NULL) == 0);
+ assert(pthread_mutex_init(&(pq.mutex), NULL) == 0);
+ for (parts = 0, diskP = DiskPartitionList; diskP; diskP = diskP->next, parts++) {
+ struct diskpartition_queue_t *dp;
+ dp = (struct diskpartition_queue_t*)malloc(sizeof(struct diskpartition_queue_t));
+ assert(dp != NULL);
+ dp->diskP = diskP;
+ queue_Append(&pq, dp);
+ }
+
+ /* number of worker threads; at least one, not to exceed the number of partitions */
+ threads = MIN(parts, vol_attach_threads);
+
+ /* create volume work queue */
+ queue_Init(&vq);
+ assert(pthread_cond_init(&(vq.cv), NULL) == 0);
+ assert(pthread_mutex_init(&(vq.mutex), NULL) == 0);
+
+ assert(pthread_attr_init(&attrs) == 0);
+ assert(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0);
+
+ Log("VInitVolumePackage: beginning parallel fileserver startup\n");
+ Log("VInitVolumePackage: using %d threads to pre-attach volumes on %d partitions\n",
+ threads, parts);
+
+ /* create threads to scan disk partitions. */
+ for (i=0; i < threads; i++) {
+ struct vinitvolumepackage_thread_param *params;
+ params = (struct vinitvolumepackage_thread_param *)malloc(sizeof(struct vinitvolumepackage_thread_param));
+ assert(params);
+ params->pq = &pq;
+ params->vq = &vq;
+ params->nthreads = threads;
+ params->thread = i+1;
+
+ AFS_SIGSET_DECL;
+ AFS_SIGSET_CLEAR();
+ assert(pthread_create (&tid, &attrs, &VInitVolumePackageThread, (void*)params) == 0);
+ AFS_SIGSET_RESTORE();
+ }
+
+ VInitPreAttachVolumes(threads, &vq);
+
+ assert(pthread_attr_destroy(&attrs) == 0);
+ assert(pthread_cond_destroy(&pq.cv) == 0);
+ assert(pthread_mutex_destroy(&pq.mutex) == 0);
+ assert(pthread_cond_destroy(&vq.cv) == 0);
+ assert(pthread_mutex_destroy(&vq.mutex) == 0);
+ }
+
+ VOL_LOCK;
+ VInit = 2; /* Initialized, and all volumes have been attached */
+ assert(pthread_cond_broadcast(&vol_init_attach_cond) == 0);
+ VOL_UNLOCK;
+
+ return 0;
+}
+
+/**
+ * Volume package initialization worker thread. Scan partitions for volume
+ * header files. Gather batches of volume ids and dispatch them to
+ * the main thread to be preattached. The volume preattachement is done
+ * in the main thread to avoid global volume lock contention.
+ */
+static void *
+VInitVolumePackageThread(void *args)
+{
+ struct vinitvolumepackage_thread_param *params;
+ struct DiskPartition64 *partition;
+ struct partition_queue *pq;
+ struct volume_init_queue *vq;
+ struct volume_init_batch *vb;
+
+ assert(args);
+ params = (struct vinitvolumepackage_thread_param *)args;
+ pq = params->pq;
+ vq = params->vq;
+ assert(pq);
+ assert(vq);
+
+ vb = (struct volume_init_batch*)malloc(sizeof(struct volume_init_batch));
+ assert(vb);
+ vb->thread = params->thread;
+ vb->last = 0;
+ vb->size = 0;
+
+ Log("Scanning partitions on thread %d of %d\n", params->thread, params->nthreads);
+ while((partition = VInitNextPartition(pq))) {
+ DIR *dirp;
+ VolId vid;
+
+ Log("Partition %s: pre-attaching volumes\n", partition->name);
+ dirp = opendir(VPartitionPath(partition));
+ if (!dirp) {
+ Log("opendir on Partition %s failed, errno=%d!\n", partition->name, errno);
+ continue;
+ }
+ while ((vid = VInitNextVolumeId(dirp))) {
+ Volume *vp = (Volume*)malloc(sizeof(Volume));
+ assert(vp);
+ memset(vp, 0, sizeof(Volume));
+ vp->device = partition->device;
+ vp->partition = partition;
+ vp->hashid = vid;
+ queue_Init(&vp->vnode_list);
+ assert(pthread_cond_init(&V_attachCV(vp), NULL) == 0);
+
+ vb->batch[vb->size++] = vp;
+ if (vb->size == VINIT_BATCH_MAX_SIZE) {
+ assert(pthread_mutex_lock(&vq->mutex) == 0);
+ queue_Append(vq, vb);
+ assert(pthread_cond_broadcast(&vq->cv) == 0);
+ assert(pthread_mutex_unlock(&vq->mutex) == 0);
+
+ vb = (struct volume_init_batch*)malloc(sizeof(struct volume_init_batch));
+ assert(vb);
+ vb->thread = params->thread;
+ vb->size = 0;
+ vb->last = 0;
+ }
+ }
+ closedir(dirp);
+ }
+
+ vb->last = 1;
+ assert(pthread_mutex_lock(&vq->mutex) == 0);
+ queue_Append(vq, vb);
+ assert(pthread_cond_broadcast(&vq->cv) == 0);
+ assert(pthread_mutex_unlock(&vq->mutex) == 0);
+
+ Log("Partition scan thread %d of %d ended\n", params->thread, params->nthreads);
+ free(params);
+ return NULL;
+}
+
+/**
+ * Read next element from the pre-populated partition list.
+ */
+static struct DiskPartition64*
+VInitNextPartition(struct partition_queue *pq)
+{
+ struct DiskPartition64 *partition;
+ struct diskpartition_queue_t *dp; /* queue element */
+
+ if (vinit_attach_abort) {
+ Log("Aborting volume preattach thread.\n");
+ return NULL;
+ }
+
+ /* get next partition to scan */
+ assert(pthread_mutex_lock(&pq->mutex) == 0);
+ if (queue_IsEmpty(pq)) {
+ assert(pthread_mutex_unlock(&pq->mutex) == 0);
+ return NULL;
+ }
+ dp = queue_First(pq, diskpartition_queue_t);
+ queue_Remove(dp);
+ assert(pthread_mutex_unlock(&pq->mutex) == 0);
+
+ assert(dp);
+ assert(dp->diskP);
+
+ partition = dp->diskP;
+ free(dp);
+ return partition;
+}
+
+/**
+ * Find next volume id on the partition.
+ */
+static VolId
+VInitNextVolumeId(DIR *dirp)
+{
+ struct dirent *d;
+ VolId vid = 0;
+ char *ext;
+
+ while((d = readdir(dirp))) {
+ if (vinit_attach_abort) {
+ Log("Aborting volume preattach thread.\n");
+ break;
+ }
+ ext = strrchr(d->d_name, '.');
+ if (d->d_name[0] == 'V' && ext && strcmp(ext, VHDREXT) == 0) {
+ vid = VolumeNumber(d->d_name);
+ if (vid) {
+ break;
+ }
+ Log("Warning: bogus volume header file: %s\n", d->d_name);
+ }
+ }
+ return vid;
+}
+
+/**
+ * Preattach volumes in batches to avoid lock contention.
+ */
+static int
+VInitPreAttachVolumes(int nthreads, struct volume_init_queue *vq)
+{
+ struct volume_init_batch *vb;
+ int i;
+
+ while (nthreads) {
+ /* dequeue next volume */
+ pthread_mutex_lock(&vq->mutex);
+ if (queue_IsEmpty(vq)) {
+ pthread_cond_wait(&vq->cv, &vq->mutex);
+ }
+ vb = queue_First(vq, volume_init_batch);
+ queue_Remove(vb);
+ pthread_mutex_unlock(&vq->mutex);
+
+ if (vb->size) {
+ VOL_LOCK;
+ for (i = 0; i<vb->size; i++) {
+ Volume *vp;
+ Volume *dup;
+ Error ec = 0;
+
+ vp = vb->batch[i];
+ dup = VLookupVolume_r(&ec, vp->hashid, NULL);
+ if (ec) {
+ Log("Error looking up volume, code=%d\n", ec);
+ }
+ else if (dup) {
+ Log("Warning: Duplicate volume id %d detected.\n", vp->hashid);
+ }
+ else {
+ /* put pre-attached volume onto the hash table
+ * and bring it up to the pre-attached state */
+ AddVolumeToHashTable(vp, vp->hashid);
+ AddVolumeToVByPList_r(vp);
+ VLRU_Init_Node_r(vp);
+ VChangeState_r(vp, VOL_STATE_PREATTACHED);
+ }
+ }
+ VOL_UNLOCK;
+ }
+
+ if (vb->last) {
+ nthreads--;
+ }
+ free(vb);
+ }
+ return 0;
+}
+#endif /* AFS_DEMAND_ATTACH_FS */
+
+#if !defined(AFS_DEMAND_ATTACH_FS)
/*
* attach all volumes on a given disk partition
*/
if (p != NULL && strcmp(p, VHDREXT) == 0) {
Error error;
Volume *vp;
-#ifdef AFS_DEMAND_ATTACH_FS
- vp = VPreAttachVolumeByName(&error, diskP->name, dp->d_name);
-#else /* AFS_DEMAND_ATTACH_FS */
vp = VAttachVolumeByName(&error, diskP->name, dp->d_name,
V_VOLUPD);
-#endif /* AFS_DEMAND_ATTACH_FS */
(*(vp ? nAttached : nUnattached))++;
if (error == VOFFLINE)
Log("Volume %d stays offline (/vice/offline/%s exists)\n", VolumeNumber(dp->d_name), dp->d_name);
diskP->name, VolumeNumber(dp->d_name),
dp->d_name);
}
-#if !defined(AFS_DEMAND_ATTACH_FS)
if (vp) {
VPutVolume(vp);
}
-#endif /* AFS_DEMAND_ATTACH_FS */
}
}
closedir(dirp);
return ret;
}
-
+#endif /* !AFS_DEMAND_ATTACH_FS */
/***************************************************/
/* Shutdown routines */