DAFS: avoid volume lock contention during initialization
authorMichael Meffie <mmeffie@sinenomine.net>
Tue, 12 Jan 2010 02:16:06 +0000 (21:16 -0500)
committerDerrick Brashear <shadow@dementia.org>
Tue, 13 Apr 2010 11:41:40 +0000 (04:41 -0700)
Avoid the excessive volume lock contention during startup to
improve the time to pre-attach a very large number of volumes.
The parallel attach worker threads avoid the volume lock
while scanning the partitions for volumes and send batches of
volume ids to the main thread to be preattached under the
volume lock.

FIXES 124489

Change-Id: Ieb33a3bdd5b06349abd9c3dd994c620021cd6194
Reviewed-on: http://gerrit.openafs.org/1092
Reviewed-by: Derrick Brashear <shadow@dementia.org>
Tested-by: Derrick Brashear <shadow@dementia.org>

src/vol/volume.c

index 1a906b4..990327f 100644 (file)
@@ -284,20 +284,77 @@ ffs(x)
 #endif /* !AFS_HAVE_FFS */
 
 #ifdef AFS_PTHREAD_ENV
+/**
+ * disk partition queue element
+ */
 typedef struct diskpartition_queue_t {
-    struct rx_queue queue;
-    struct DiskPartition64 * diskP;
+    struct rx_queue queue;             /**< queue header */
+    struct DiskPartition64 *diskP;     /**< disk partition table entry */
 } diskpartition_queue_t;
+
+#ifndef AFS_DEMAND_ATTACH_FS
+
 typedef struct vinitvolumepackage_thread_t {
     struct rx_queue queue;
     pthread_cond_t thread_done_cv;
     int n_threads_complete;
 } vinitvolumepackage_thread_t;
 static void * VInitVolumePackageThread(void * args);
+
+#else  /* !AFS_DEMAND_ATTTACH_FS */
+#define VINIT_BATCH_MAX_SIZE 512
+
+/**
+ * disk partition work queue
+ */
+struct partition_queue {
+    struct rx_queue head;              /**< diskpartition_queue_t queue */
+    pthread_mutex_t mutex;
+    pthread_cond_t cv;
+};
+
+/**
+ * volumes parameters for preattach
+ */
+struct volume_init_batch {
+    struct rx_queue queue;               /**< queue header */
+    int thread;                          /**< posting worker thread */
+    int last;                            /**< indicates thread is done */
+    int size;                            /**< number of volume ids in batch */
+    Volume *batch[VINIT_BATCH_MAX_SIZE]; /**< volumes ids to preattach */
+};
+
+/**
+ * volume parameters work queue
+ */
+struct volume_init_queue {
+    struct rx_queue head;                /**< volume_init_batch queue */
+    pthread_mutex_t mutex;
+    pthread_cond_t cv;
+};
+
+/**
+ * volume init worker thread parameters
+ */
+struct vinitvolumepackage_thread_param {
+    int nthreads;                        /**< total number of worker threads */
+    int thread;                          /**< thread number for this worker thread */
+    struct partition_queue *pq;          /**< queue partitions to scan */
+    struct volume_init_queue *vq;        /**< queue of volume to preattach */
+};
+
+static void *VInitVolumePackageThread(void *args);
+static struct DiskPartition64 *VInitNextPartition(struct partition_queue *pq);
+static VolId VInitNextVolumeId(DIR *dirp);
+static int VInitPreAttachVolumes(int nthreads, struct volume_init_queue *vq);
+
+#endif /* !AFS_DEMAND_ATTACH_FS */
 #endif /* AFS_PTHREAD_ENV */
 
+#ifndef AFS_DEMAND_ATTACH_FS
 static int VAttachVolumesByPartition(struct DiskPartition64 *diskP, 
                                     int * nAttached, int * nUnattached);
+#endif /* AFS_DEMAND_ATTACH_FS */
 
 
 #ifdef AFS_DEMAND_ATTACH_FS
@@ -572,13 +629,55 @@ VInitVolumePackage2(ProgramType pt, VolumePackageOptions * opts)
     return 0;
 }
 
+
+#if !defined(AFS_PTHREAD_ENV)
+/**
+ * Attach volumes in vice partitions
+ *
+ * @param[in]  pt         calling program type
+ *
+ * @return 0
+ * @note This is the original, non-threaded version of attach parititions.
+ *
+ * @post VInit state is 2
+ */
+int
+VInitAttachVolumes(ProgramType pt)
+{
+    assert(VInit==1);
+    if (pt == fileServer) {
+       struct DiskPartition64 *diskP;
+       /* Attach all the volumes in this partition */
+       for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
+           int nAttached = 0, nUnattached = 0;
+           assert(VAttachVolumesByPartition(diskP, &nAttached, &nUnattached) == 0);
+       }
+    }
+    VOL_LOCK;
+    VInit = 2;                 /* Initialized, and all volumes have been attached */
+    LWP_NoYieldSignal(VInitAttachVolumes);
+    VOL_UNLOCK;
+    return 0;
+}
+#endif /* !AFS_PTHREAD_ENV */
+
+#if defined(AFS_PTHREAD_ENV) && !defined(AFS_DEMAND_ATTACH_FS)
+/**
+ * Attach volumes in vice partitions
+ *
+ * @param[in]  pt         calling program type
+ *
+ * @return 0
+ * @note Threaded version of attach parititions.
+ *
+ * @post VInit state is 2
+ */
 int
 VInitAttachVolumes(ProgramType pt)
 {
     assert(VInit==1);
     if (pt == fileServer) {
        struct DiskPartition64 *diskP;
-#ifdef AFS_PTHREAD_ENV
        struct vinitvolumepackage_thread_t params;
        struct diskpartition_queue_t * dpq;
        int i, threads, parts;
@@ -605,13 +704,8 @@ VInitAttachVolumes(ProgramType pt)
            assert(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0);
 
            Log("VInitVolumePackage: beginning parallel fileserver startup\n");
-#ifdef AFS_DEMAND_ATTACH_FS
-           Log("VInitVolumePackage: using %d threads to pre-attach volumes on %d partitions\n",
-               threads, parts);
-#else /* AFS_DEMAND_ATTACH_FS */
            Log("VInitVolumePackage: using %d threads to attach volumes on %d partitions\n",
                threads, parts);
-#endif /* AFS_DEMAND_ATTACH_FS */
 
            VOL_LOCK;
            for (i=0; i < threads; i++) {
@@ -633,40 +727,21 @@ VInitAttachVolumes(ProgramType pt)
            /* if we're only going to run one init thread, don't bother creating
             * another LWP */
            Log("VInitVolumePackage: beginning single-threaded fileserver startup\n");
-#ifdef AFS_DEMAND_ATTACH_FS
-           Log("VInitVolumePackage: using 1 thread to pre-attach volumes on %d partition(s)\n",
-               parts);
-#else /* AFS_DEMAND_ATTACH_FS */
            Log("VInitVolumePackage: using 1 thread to attach volumes on %d partition(s)\n",
                parts);
-#endif /* AFS_DEMAND_ATTACH_FS */
 
            VInitVolumePackageThread(&params);
        }
 
        assert(pthread_cond_destroy(&params.thread_done_cv) == 0);
-
-#else /* AFS_PTHREAD_ENV */
-
-       /* Attach all the volumes in this partition */
-       for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
-           int nAttached = 0, nUnattached = 0;
-           assert(VAttachVolumesByPartition(diskP, &nAttached, &nUnattached) == 0);
-       }
-#endif /* AFS_PTHREAD_ENV */
     }
     VOL_LOCK;
     VInit = 2;                 /* Initialized, and all volumes have been attached */
-#ifdef AFS_PTHREAD_ENV
     assert(pthread_cond_broadcast(&vol_init_attach_cond) == 0);
-#else
-    LWP_NoYieldSignal(VInitAttachVolumes);
-#endif /* AFS_PTHREAD_ENV */
     VOL_UNLOCK;
     return 0;
 }
 
-#ifdef AFS_PTHREAD_ENV
 static void *
 VInitVolumePackageThread(void * args) {
 
@@ -704,8 +779,285 @@ done:
     VOL_UNLOCK;
     return NULL;
 }
-#endif /* AFS_PTHREAD_ENV */
+#endif /* AFS_PTHREAD_ENV && !AFS_DEMAND_ATTACH_FS */
 
+#if defined(AFS_DEMAND_ATTACH_FS)
+/**
+ * Attach volumes in vice partitions
+ *
+ * @param[in]  pt         calling program type
+ *
+ * @return 0
+ * @note Threaded version of attach partitions.
+ *
+ * @post VInit state is 2
+ */
+int
+VInitAttachVolumes(ProgramType pt)
+{
+    assert(VInit==1);
+    if (pt == fileServer) {
+
+       struct DiskPartition64 *diskP;
+       struct partition_queue pq;
+        struct volume_init_queue vq;
+
+       int i, threads, parts;
+       pthread_t tid;
+       pthread_attr_t attrs;
+
+       /* create partition work queue */
+        queue_Init(&pq);
+        assert(pthread_cond_init(&(pq.cv), NULL) == 0);
+        assert(pthread_mutex_init(&(pq.mutex), NULL) == 0);
+       for (parts = 0, diskP = DiskPartitionList; diskP; diskP = diskP->next, parts++) {
+           struct diskpartition_queue_t *dp;
+           dp = (struct diskpartition_queue_t*)malloc(sizeof(struct diskpartition_queue_t));
+           assert(dp != NULL);
+           dp->diskP = diskP;
+           queue_Append(&pq, dp);
+       }
+
+        /* number of worker threads; at least one, not to exceed the number of partitions */
+       threads = MIN(parts, vol_attach_threads);
+
+        /* create volume work queue */
+        queue_Init(&vq);
+        assert(pthread_cond_init(&(vq.cv), NULL) == 0);
+        assert(pthread_mutex_init(&(vq.mutex), NULL) == 0);
+
+        assert(pthread_attr_init(&attrs) == 0);
+        assert(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0);
+
+        Log("VInitVolumePackage: beginning parallel fileserver startup\n");
+        Log("VInitVolumePackage: using %d threads to pre-attach volumes on %d partitions\n",
+               threads, parts);
+
+        /* create threads to scan disk partitions. */
+       for (i=0; i < threads; i++) {
+           struct vinitvolumepackage_thread_param *params;
+           params = (struct vinitvolumepackage_thread_param *)malloc(sizeof(struct vinitvolumepackage_thread_param));
+            assert(params);
+            params->pq = &pq;
+            params->vq = &vq;
+            params->nthreads = threads;
+            params->thread = i+1;
+
+            AFS_SIGSET_DECL;
+            AFS_SIGSET_CLEAR();
+           assert(pthread_create (&tid, &attrs, &VInitVolumePackageThread, (void*)params) == 0);
+            AFS_SIGSET_RESTORE();
+       }
+
+        VInitPreAttachVolumes(threads, &vq);
+
+        assert(pthread_attr_destroy(&attrs) == 0);
+        assert(pthread_cond_destroy(&pq.cv) == 0);
+        assert(pthread_mutex_destroy(&pq.mutex) == 0);
+        assert(pthread_cond_destroy(&vq.cv) == 0);
+        assert(pthread_mutex_destroy(&vq.mutex) == 0);
+    }
+
+    VOL_LOCK;
+    VInit = 2;                 /* Initialized, and all volumes have been attached */
+    assert(pthread_cond_broadcast(&vol_init_attach_cond) == 0);
+    VOL_UNLOCK;
+
+    return 0;
+}
+
+/**
+ * Volume package initialization worker thread. Scan partitions for volume
+ * header files. Gather batches of volume ids and dispatch them to
+ * the main thread to be preattached.  The volume preattachement is done
+ * in the main thread to avoid global volume lock contention.
+ */
+static void *
+VInitVolumePackageThread(void *args)
+{
+    struct vinitvolumepackage_thread_param *params;
+    struct DiskPartition64 *partition;
+    struct partition_queue *pq;
+    struct volume_init_queue *vq;
+    struct volume_init_batch *vb;
+
+    assert(args);
+    params = (struct vinitvolumepackage_thread_param *)args;
+    pq = params->pq;
+    vq = params->vq;
+    assert(pq);
+    assert(vq);
+
+    vb = (struct volume_init_batch*)malloc(sizeof(struct volume_init_batch));
+    assert(vb);
+    vb->thread = params->thread;
+    vb->last = 0;
+    vb->size = 0;
+
+    Log("Scanning partitions on thread %d of %d\n", params->thread, params->nthreads);
+    while((partition = VInitNextPartition(pq))) {
+        DIR *dirp;
+        VolId vid;
+
+        Log("Partition %s: pre-attaching volumes\n", partition->name);
+        dirp = opendir(VPartitionPath(partition));
+        if (!dirp) {
+            Log("opendir on Partition %s failed, errno=%d!\n", partition->name, errno);
+            continue;
+        }
+        while ((vid = VInitNextVolumeId(dirp))) {
+            Volume *vp = (Volume*)malloc(sizeof(Volume));
+            assert(vp);
+            memset(vp, 0, sizeof(Volume));
+            vp->device = partition->device;
+            vp->partition = partition;
+            vp->hashid = vid;
+            queue_Init(&vp->vnode_list);
+            assert(pthread_cond_init(&V_attachCV(vp), NULL) == 0);
+
+            vb->batch[vb->size++] = vp;
+            if (vb->size == VINIT_BATCH_MAX_SIZE) {
+                assert(pthread_mutex_lock(&vq->mutex) == 0);
+                queue_Append(vq, vb);
+                assert(pthread_cond_broadcast(&vq->cv) == 0);
+                assert(pthread_mutex_unlock(&vq->mutex) == 0);
+
+                vb = (struct volume_init_batch*)malloc(sizeof(struct volume_init_batch));
+                assert(vb);
+                vb->thread = params->thread;
+                vb->size = 0;
+                vb->last = 0;
+            }
+        }
+        closedir(dirp);
+    }
+
+    vb->last = 1;
+    assert(pthread_mutex_lock(&vq->mutex) == 0);
+    queue_Append(vq, vb);
+    assert(pthread_cond_broadcast(&vq->cv) == 0);
+    assert(pthread_mutex_unlock(&vq->mutex) == 0);
+
+    Log("Partition scan thread %d of %d ended\n", params->thread, params->nthreads);
+    free(params);
+    return NULL;
+}
+
+/**
+ * Read next element from the pre-populated partition list.
+ */
+static struct DiskPartition64*
+VInitNextPartition(struct partition_queue *pq)
+{
+    struct DiskPartition64 *partition;
+    struct diskpartition_queue_t *dp; /* queue element */
+
+    if (vinit_attach_abort) {
+        Log("Aborting volume preattach thread.\n");
+        return NULL;
+    }
+
+    /* get next partition to scan */
+    assert(pthread_mutex_lock(&pq->mutex) == 0);
+    if (queue_IsEmpty(pq)) {
+        assert(pthread_mutex_unlock(&pq->mutex) == 0);
+        return NULL;
+    }
+    dp = queue_First(pq, diskpartition_queue_t);
+    queue_Remove(dp);
+    assert(pthread_mutex_unlock(&pq->mutex) == 0);
+
+    assert(dp);
+    assert(dp->diskP);
+
+    partition = dp->diskP;
+    free(dp);
+    return partition;
+}
+
+/**
+ * Find next volume id on the partition.
+ */
+static VolId
+VInitNextVolumeId(DIR *dirp)
+{
+    struct dirent *d;
+    VolId vid = 0;
+    char *ext;
+
+    while((d = readdir(dirp))) {
+        if (vinit_attach_abort) {
+            Log("Aborting volume preattach thread.\n");
+            break;
+        }
+        ext = strrchr(d->d_name, '.');
+        if (d->d_name[0] == 'V' && ext && strcmp(ext, VHDREXT) == 0) {
+            vid = VolumeNumber(d->d_name);
+            if (vid) {
+               break;
+            }
+            Log("Warning: bogus volume header file: %s\n", d->d_name);
+        }
+    }
+    return vid;
+}
+
+/**
+ * Preattach volumes in batches to avoid lock contention.
+ */
+static int
+VInitPreAttachVolumes(int nthreads, struct volume_init_queue *vq)
+{
+    struct volume_init_batch *vb;
+    int i;
+
+    while (nthreads) {
+        /* dequeue next volume */
+        pthread_mutex_lock(&vq->mutex);
+        if (queue_IsEmpty(vq)) {
+            pthread_cond_wait(&vq->cv, &vq->mutex);
+        }
+        vb = queue_First(vq, volume_init_batch);
+        queue_Remove(vb);
+        pthread_mutex_unlock(&vq->mutex);
+
+        if (vb->size) {
+            VOL_LOCK;
+            for (i = 0; i<vb->size; i++) {
+                Volume *vp;
+                Volume *dup;
+                Error ec = 0;
+
+                vp = vb->batch[i];
+               dup = VLookupVolume_r(&ec, vp->hashid, NULL);
+                if (ec) {
+                    Log("Error looking up volume, code=%d\n", ec);
+                }
+                else if (dup) {
+                    Log("Warning: Duplicate volume id %d detected.\n", vp->hashid);
+                }
+                else {
+                    /* put pre-attached volume onto the hash table
+                     * and bring it up to the pre-attached state */
+                    AddVolumeToHashTable(vp, vp->hashid);
+                    AddVolumeToVByPList_r(vp);
+                    VLRU_Init_Node_r(vp);
+                    VChangeState_r(vp, VOL_STATE_PREATTACHED);
+                }
+            }
+            VOL_UNLOCK;
+        }
+
+        if (vb->last) {
+            nthreads--;
+        }
+        free(vb);
+    }
+    return 0;
+}
+#endif /* AFS_DEMAND_ATTACH_FS */
+
+#if !defined(AFS_DEMAND_ATTACH_FS)
 /*
  * attach all volumes on a given disk partition
  */
@@ -735,12 +1087,8 @@ VAttachVolumesByPartition(struct DiskPartition64 *diskP, int * nAttached, int *
     if (p != NULL && strcmp(p, VHDREXT) == 0) {
       Error error;
       Volume *vp;
-#ifdef AFS_DEMAND_ATTACH_FS
-      vp = VPreAttachVolumeByName(&error, diskP->name, dp->d_name);
-#else /* AFS_DEMAND_ATTACH_FS */
       vp = VAttachVolumeByName(&error, diskP->name, dp->d_name,
                               V_VOLUPD);
-#endif /* AFS_DEMAND_ATTACH_FS */
       (*(vp ? nAttached : nUnattached))++;
       if (error == VOFFLINE)
        Log("Volume %d stays offline (/vice/offline/%s exists)\n", VolumeNumber(dp->d_name), dp->d_name);
@@ -749,11 +1097,9 @@ VAttachVolumesByPartition(struct DiskPartition64 *diskP, int * nAttached, int *
            diskP->name, VolumeNumber(dp->d_name),
            dp->d_name);
       }
-#if !defined(AFS_DEMAND_ATTACH_FS)
       if (vp) {
        VPutVolume(vp);
       }
-#endif /* AFS_DEMAND_ATTACH_FS */
     }
   }
 
@@ -762,7 +1108,7 @@ done:
   closedir(dirp);
   return ret;
 }
-
+#endif /* !AFS_DEMAND_ATTACH_FS */
 
 /***************************************************/
 /* Shutdown routines                               */