Windows: cm_FindVolumeByName refactoring
authorJeffrey Altman <jaltman@your-file-system.com>
Wed, 31 Oct 2012 13:53:57 +0000 (09:53 -0400)
committerJeffrey Altman <jaltman@your-file-system.com>
Wed, 31 Oct 2012 22:17:15 +0000 (15:17 -0700)
The cm_volume allocation within cm_FindVolumeByName() was racy.
Given how locks were obtained and dropped it was possible for two
threads to both determine that a cm_volume_t object needed to be
allocated.  It might even have been possible for two threads to
attempt to allocate the same object.

This refactoring ensures that if a volume cannot be found under
a read lock that a second search is performed under the write lock
in case the object had in fact been allocated during the transition.

Once it is determined that an allocation is required, the cm_volumeLock
is not dropped until the object has been built and inserted into the
name hash table.   This ensures that two threads cannot attempt to
allocate a cm_volume object for the same volume group.

InterlockedIncrement is used to manage the cm_data volume count.

Change-Id: I64c07cbc0f7968c5580478ff33214f67088072f8
Reviewed-on: http://gerrit.openafs.org/8346
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Jeffrey Altman <jaltman@your-file-system.com>
Tested-by: Jeffrey Altman <jaltman@your-file-system.com>

src/WINNT/afsd/cm_volume.c

index 7f682b0..a63bd8f 100644 (file)
@@ -923,96 +923,109 @@ long cm_FindVolumeByName(struct cm_cell *cellp, char *volumeNamep,
          */
         lock_ConvertRToW(&cm_volumeLock);
 
-       if ( cm_data.currentVolumes >= cm_data.maxVolumes ) {
-#ifdef RECYCLE_FROM_ALL_VOLUMES_LIST
-           for (volp = cm_data.allVolumesp; volp; volp=volp->allNextp) {
-               if ( volp->refCount == 0 ) {
-                   /* There is one we can re-use */
-                   break;
-               }
-           }
-#else
-            for ( volp = cm_data.volumeLRULastp;
-                  volp;
-                  volp = (cm_volume_t *) osi_QPrev(&volp->q))
-            {
-               if ( volp->refCount == 0 ) {
-                   /* There is one we can re-use */
-                   break;
-               }
-            }
-#endif
-           if (!volp)
-               osi_panic("Exceeded Max Volumes", __FILE__, __LINE__);
+        /*
+         * While the lock was converted it may have been dropped
+         * Search again now that we are exclusive.
+         */
+        for (volp = cm_data.volumeNameHashTablep[hash]; volp; volp = volp->nameNextp) {
+            if (cellp == volp->cellp && strcmp(name, volp->namep) == 0)
+                break;
+        }
 
-            InterlockedIncrement(&volp->refCount);
+        if (volp) {
+            cm_GetVolume(volp);
             lock_ReleaseWrite(&cm_volumeLock);
             lock_ObtainWrite(&volp->rw);
-            lock_ObtainWrite(&cm_volumeLock);
+        } else {
+            if ( cm_data.currentVolumes >= cm_data.maxVolumes ) {
+#ifdef RECYCLE_FROM_ALL_VOLUMES_LIST
+                for (volp = cm_data.allVolumesp; volp; volp=volp->allNextp) {
+                    if ( volp->refCount == 0 ) {
+                        /* There is one we can re-use */
+                        break;
+                    }
+                }
+#else
+                for ( volp = cm_data.volumeLRULastp;
+                      volp;
+                      volp = (cm_volume_t *) osi_QPrev(&volp->q))
+                {
+                    if ( volp->refCount == 0 ) {
+                        /* There is one we can re-use */
+                        break;
+                    }
+                }
+#endif
+                if (!volp)
+                    osi_panic("Exceeded Max Volumes", __FILE__, __LINE__);
 
-            osi_Log2(afsd_logp, "Recycling Volume %s:%s",
-                     volp->cellp->name, volp->namep);
+                osi_Log2(afsd_logp, "Recycling Volume %s:%s",
+                         volp->cellp->name, volp->namep);
 
-            /* The volp is removed from the LRU queue in order to
-             * prevent two threads from attempting to recycle the
-             * same object.  This volp must be re-inserted back into
-             * the LRU queue before this function exits.
+                /* The volp is removed from the LRU queue in order to
+                 * prevent two threads from attempting to recycle the
+                 * same object.  This volp must be re-inserted back into
+                 * the LRU queue before this function exits.
+                 */
+                if (volp->qflags & CM_VOLUME_QFLAG_IN_LRU_QUEUE)
+                    cm_RemoveVolumeFromLRU(volp);
+                if (volp->qflags & CM_VOLUME_QFLAG_IN_HASH)
+                    cm_RemoveVolumeFromNameHashTable(volp);
+
+                for ( volType = RWVOL; volType < NUM_VOL_TYPES; volType++) {
+                    if (volp->vol[volType].qflags & CM_VOLUME_QFLAG_IN_HASH)
+                        cm_RemoveVolumeFromIDHashTable(volp, volType);
+                    if (volp->vol[volType].ID)
+                        cm_VolumeStatusNotification(volp, volp->vol[volType].ID, volp->vol[volType].state, vl_unknown);
+                    volp->vol[volType].ID = 0;
+                    cm_SetFid(&volp->vol[volType].dotdotFid, 0, 0, 0, 0);
+                }
+            } else {
+                volp = &cm_data.volumeBaseAddress[InterlockedIncrement(&cm_data.currentVolumes) - 1];
+                memset(volp, 0, sizeof(cm_volume_t));
+                volp->magic = CM_VOLUME_MAGIC;
+                volp->allNextp = cm_data.allVolumesp;
+                cm_data.allVolumesp = volp;
+                lock_InitializeRWLock(&volp->rw, "cm_volume_t rwlock", LOCK_HIERARCHY_VOLUME);
+            }
+            /*
+             * no one else can find this object and we have not dropped
+             * cm_volumeLock in any case.  The object is either new or
+             * recycled.  Initialize its new values and put it into the
+             * name hash table before dropping cm_volumeLock which makes
+             * it visible to competing threads.
              */
-            if (volp->qflags & CM_VOLUME_QFLAG_IN_LRU_QUEUE)
-                cm_RemoveVolumeFromLRU(volp);
-            if (volp->qflags & CM_VOLUME_QFLAG_IN_HASH)
-                cm_RemoveVolumeFromNameHashTable(volp);
+            volp->cellp = cellp;
+            strncpy(volp->namep, name, VL_MAXNAMELEN);
+            volp->namep[VL_MAXNAMELEN-1] = '\0';
+            volp->flags = CM_VOLUMEFLAG_RESET;
+            volp->lastUpdateTime = 0;
 
             for ( volType = RWVOL; volType < NUM_VOL_TYPES; volType++) {
-                if (volp->vol[volType].qflags & CM_VOLUME_QFLAG_IN_HASH)
-                    cm_RemoveVolumeFromIDHashTable(volp, volType);
-                if (volp->vol[volType].ID)
-                    cm_VolumeStatusNotification(volp, volp->vol[volType].ID, volp->vol[volType].state, vl_unknown);
-                volp->vol[volType].ID = 0;
-                cm_SetFid(&volp->vol[volType].dotdotFid, 0, 0, 0, 0);
-                lock_ReleaseWrite(&cm_volumeLock);
-                cm_FreeServerList(&volp->vol[volType].serversp, CM_FREESERVERLIST_DELETE);
-                lock_ObtainWrite(&cm_volumeLock);
+                volp->vol[volType].state = vl_unknown;
+                volp->vol[volType].nextp = NULL;
+                volp->vol[volType].flags = 0;
             }
-       } else {
-           volp = &cm_data.volumeBaseAddress[cm_data.currentVolumes++];
-           memset(volp, 0, sizeof(cm_volume_t));
-           volp->magic = CM_VOLUME_MAGIC;
-           volp->allNextp = cm_data.allVolumesp;
-           cm_data.allVolumesp = volp;
-           lock_InitializeRWLock(&volp->rw, "cm_volume_t rwlock", LOCK_HIERARCHY_VOLUME);
+            volp->cbExpiresRO = 0;
+            volp->cbIssuedRO = 0;
+            volp->cbServerpRO = NULL;
+            volp->creationDateRO = 0;
+            cm_AddVolumeToNameHashTable(volp);
+            cm_GetVolume(volp);
             lock_ReleaseWrite(&cm_volumeLock);
             lock_ObtainWrite(&volp->rw);
-            lock_ObtainWrite(&cm_volumeLock);
-            volp->refCount = 1;        /* starts off held */
-        }
-       volp->cellp = cellp;
-       strncpy(volp->namep, name, VL_MAXNAMELEN);
-       volp->namep[VL_MAXNAMELEN-1] = '\0';
-       volp->flags = CM_VOLUMEFLAG_RESET;
-        volp->lastUpdateTime = 0;
-
-        for ( volType = RWVOL; volType < NUM_VOL_TYPES; volType++) {
-            volp->vol[volType].state = vl_unknown;
-            volp->vol[volType].nextp = NULL;
-            volp->vol[volType].flags = 0;
+            for ( volType = RWVOL; volType < NUM_VOL_TYPES; volType++) {
+                cm_FreeServerList(&volp->vol[volType].serversp, CM_FREESERVERLIST_DELETE);
+            }
         }
-        volp->cbExpiresRO = 0;
-        volp->cbIssuedRO = 0;
-        volp->cbServerpRO = NULL;
-        volp->creationDateRO = 0;
-        cm_AddVolumeToNameHashTable(volp);
-        lock_ReleaseWrite(&cm_volumeLock);
     }
-    else {
-        if (volp)
-            cm_GetVolume(volp);
+    else if (volp) {
+        cm_GetVolume(volp);
         lock_ReleaseRead(&cm_volumeLock);
-
-        if (!volp)
-            return CM_ERROR_NOSUCHVOLUME;
-
         lock_ObtainWrite(&volp->rw);
+    } else {
+        lock_ReleaseRead(&cm_volumeLock);
+        return CM_ERROR_NOSUCHVOLUME;
     }
 
     /* if we get here we are holding the mutex */