windows-bplus-tree-20070823
authorJeffrey Altman <jaltman@secure-endpoints.com>
Fri, 24 Aug 2007 04:19:18 +0000 (04:19 +0000)
committerJeffrey Altman <jaltman@secure-endpoints.com>
Fri, 24 Aug 2007 04:19:18 +0000 (04:19 +0000)
Windows uses case-insensitive file name pattern matching
but AFS is a case sensitive file system.  The AFS3 directory
format is block based, uses network byte order and
includes a hash table for fast case sensitive lookups.
This causes several problems for the Windows AFS client.
(1) Traversing the directory blocks is cpu expensive
(2) A hash table miss does not indicate that the desired
    entry does not exist.
(3) Determining whether a non-ambiguous inexact match or
    the entry does not exist requires a linear traversal
    of the entire directory.
These issues often result in 100% CPU utilization.

These issues are addressed by building a modified B+ tree for
each directory and then using the B+ tree for searches.

Further improvements can be made by using the B+ tree leaf
nodes for directory enumeration.

14 files changed:
src/WINNT/afsd/NTMakefile
src/WINNT/afsd/afsd.h
src/WINNT/afsd/afsd_init.c
src/WINNT/afsd/afsd_service.c
src/WINNT/afsd/cm.h
src/WINNT/afsd/cm_btree.c
src/WINNT/afsd/cm_btree.h
src/WINNT/afsd/cm_dir.c
src/WINNT/afsd/cm_dir.h
src/WINNT/afsd/cm_scache.c
src/WINNT/afsd/cm_scache.h
src/WINNT/afsd/cm_utils.c
src/WINNT/afsd/cm_vnodeops.c
src/WINNT/afsd/lock.txt

index b9ac3e4..6a4fce1 100644 (file)
@@ -98,6 +98,7 @@ AFSDOBJS=\
         $(OUT)\rawops.obj \
         $(OUT)\afsdifs.obj \
        $(OUT)\afsd_init.obj \
+        $(OUT)\cm_btree.obj \
        $(OUT)\cm_cell.obj \
        $(OUT)\cm_server.obj \
        $(OUT)\cm_volume.obj \
index a06adf6..0e1b2ec 100644 (file)
@@ -10,6 +10,8 @@
 #ifndef __AFSD_H_ENV__
 #define __AFSD_H_ENV__ 1
 
+#define USE_BPLUS 1
+
 #include <afs/param.h>
 
 BOOL InitClass(HANDLE);
@@ -23,8 +25,13 @@ BOOL APIENTRY About(HWND, unsigned int, unsigned int, long);
 #include "cm.h"
 
 #include <osi.h>
+#include <afs/vldbint.h>
+#include <afs/afsint.h>
+#include <afs/prs_fs.h>
+
 #include "cm_config.h"
 #include "cm_user.h"
+#include "cm_scache.h"
 #include "cm_callback.h"
 #ifdef DISKCACHE95
 #include "cm_diskcache95.h"
@@ -33,7 +40,6 @@ BOOL APIENTRY About(HWND, unsigned int, unsigned int, long);
 #include "cm_aclent.h"
 #include "cm_server.h"
 #include "cm_cell.h"
-#include "cm_scache.h"
 #include "cm_volume.h"
 #include "cm_volstat.h"
 #include "cm_dcache.h"
@@ -51,9 +57,6 @@ BOOL APIENTRY About(HWND, unsigned int, unsigned int, long);
 #include "afsd_init.h"
 #include "afsd_eventlog.h"
 
-#include <afs/vldbint.h>
-#include <afs/afsint.h>
-#include <afs/prs_fs.h>
 
 #define AFS_DAEMON_SERVICE_NAME AFSREG_CLT_SVC_NAME
 #define AFS_DAEMON_EVENT_NAME   AFSREG_CLT_SW_NAME
index b853bdc..6436568 100644 (file)
@@ -40,6 +40,7 @@ extern int RXSTATS_ExecuteRequest(struct rx_call *z_call);
 extern afs_int32 cryptall;
 extern int cm_enableServerLocks;
 extern int cm_deleteReadOnly;
+extern afs_int32 cm_BPlusTrees;
 
 osi_log_t *afsd_logp;
 
@@ -1055,6 +1056,14 @@ int afsd_InitCM(char **reasonP)
     } 
     afsi_log("CM DeleteReadOnly is %u", cm_deleteReadOnly);
     
+    dummyLen = sizeof(DWORD);
+    code = RegQueryValueEx(parmKey, "BPlusTrees", NULL, NULL,
+                           (BYTE *) &dwValue, &dummyLen);
+    if (code == ERROR_SUCCESS) {
+        cm_BPlusTrees = (unsigned short) dwValue;
+    } 
+    afsi_log("CM BPlusTrees is %u", cm_BPlusTrees);
+    
     RegCloseKey (parmKey);
 
     cacheBlocks = ((afs_uint64)cacheSize * 1024) / blockSize;
index d850037..ea3fdd4 100644 (file)
@@ -1469,6 +1469,11 @@ afsd_Main(DWORD argc, LPTSTR *argv)
         PowerNotificationThreadExit();
 #endif
 
+    cm_DirDumpStats();
+#ifdef USE_BPLUS
+    cm_BPlusDumpStats();
+#endif
+
     /* Notify any Volume Status Handlers that we are stopped */
     cm_VolStatus_Service_Stopped();
 
index 379d2a3..5a8d473 100644 (file)
@@ -297,6 +297,7 @@ int RXAFS_Lookup (struct rx_connection *,
 #define CM_ERROR_TOOMANYBUFS           (CM_ERROR_BASE+51)
 #define CM_ERROR_BAD_LEVEL             (CM_ERROR_BASE+52)
 #define CM_ERROR_NOT_A_DFSLINK          (CM_ERROR_BASE+53)
+#define CM_ERROR_INEXACT_MATCH          (CM_ERROR_BASE+54)
 
 /* Used by cm_FollowMountPoint and cm_GetVolumeByName */
 #define RWVOL  0
index 6ac6acf..de2bbcc 100644 (file)
@@ -1 +1,1591 @@
-/* Place holder for B+ tree source code */
+/*
+ * Copyright 2007 Secure Endpoints Inc.  
+ * 
+ * All Rights Reserved.
+ *
+ * This software has been released under the terms of the IBM Public
+ * License.  For details, see the LICENSE file in the top-level source
+ * directory or online at http://www.openafs.org/dl/license10.html
+ *
+ * Thanks to Jan Jannink for B+ tree algorithms.
+ */
+
+#include <windows.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include "afsd.h"
+
+#ifdef USE_BPLUS
+#include "cm_btree.h"
+
+/******************* statistics globals  *********************************/
+afs_uint32 bplus_lookup_hits = 0;
+afs_uint32 bplus_lookup_hits_inexact = 0;
+afs_uint32 bplus_lookup_misses = 0;
+afs_uint32 bplus_lookup_ambiguous = 0;
+afs_uint32 bplus_create_entry = 0;
+afs_uint32 bplus_remove_entry = 0;
+afs_uint32 bplus_build_tree = 0;
+afs_uint32 bplus_free_tree = 0;
+afs_uint32 bplus_dv_error = 0;
+
+afs_uint64 bplus_lookup_time = 0;
+afs_uint64 bplus_create_time = 0;
+afs_uint64 bplus_remove_time = 0;
+afs_uint64 bplus_build_time = 0;
+afs_uint64 bplus_free_time = 0;
+
+/***********************   private functions   *************************/
+static void initFreeNodePool(Tree *B, int quantity);
+static Nptr getFreeNode(Tree *B);
+static void putFreeNode(Tree *B, Nptr self);
+static void cleanupNodePool(Tree *B);
+
+static Nptr descendToLeaf(Tree *B, Nptr curr);
+static int getSlot(Tree *B, Nptr curr);
+static int findKey(Tree *B, Nptr curr, int lo, int hi);
+static int bestMatch(Tree *B, Nptr curr, int slot);
+
+static Nptr getDataNode(Tree *B, keyT key, dataT data);
+static Nptr descendSplit(Tree *B, Nptr curr);
+static void insertEntry(Tree *B, Nptr node, int slot, Nptr sibling, Nptr downPtr);
+static void placeEntry(Tree *B, Nptr node, int slot, Nptr downPtr);
+static Nptr split(Tree *B, Nptr node);
+static void makeNewRoot(Tree *B, Nptr oldRoot, Nptr newNode);
+
+static Nptr descendBalance(Tree *B, Nptr curr, Nptr left, Nptr right, Nptr lAnc, Nptr rAnc, Nptr parent);
+static void collapseRoot(Tree *B, Nptr oldRoot, Nptr newRoot);
+static void removeEntry(Tree *B, Nptr curr, int slot);
+static Nptr merge(Tree *B, Nptr left, Nptr right, Nptr anchor);
+static Nptr shift(Tree *B, Nptr left, Nptr right, Nptr anchor);
+
+static void _clrentry(Nptr node, int entry);
+static void _pushentry(Nptr node, int entry, int offset);
+static void _pullentry(Nptr node, int entry, int offset);
+static void _xferentry(Nptr srcNode, int srcEntry, Nptr destNode, int destEntry);
+static void _setentry(Nptr node, int entry, keyT key, Nptr downNode);
+
+#ifdef DEBUG_BTREE
+static int _isRoot(Tree *B, Nptr n)
+{
+    int flagset = (n->flags & isROOT) == isROOT);
+
+    if (!isnode(n))
+        return 0;
+
+    if (flagset && n != getroot(B))
+        DebugBreak();
+
+    return flagset;
+}
+
+static int _isFew(Tree *B, Nptr n)
+{
+    int flagset = (n->flags & FEWEST) == FEWEST);
+    int fanout = getminfanout(B, n);
+    int entries = numentries(n);
+    int mincnt  = entries <= fanout;
+
+    if (!isnode(n))
+        return 0;
+
+    if (flagset && !mincnt || !flagset && mincnt)
+        DebugBreak();
+
+    return flagset;
+}
+
+static int _isFull(Tree *B, Nptr n)
+{
+    int flagset = (n->flags & isFULL) == isFULL);
+    int maxcnt  = numentries(n) == getfanout(B);
+
+    if (!isnode(n))
+        return 0;
+
+    if (flagset && !maxcnt || !flagset && maxcnt)
+        DebugBreak();
+
+    return flagset;
+}
+#endif /* DEBUG_BTREE */
+
+/***********************************************************************\
+|      B+tree Initialization and Cleanup Routines                      |
+\***********************************************************************/
+
+/********************   Set up B+tree structure   **********************/
+Tree *initBtree(unsigned int poolsz, unsigned int fanout, KeyCmp keyCmp)
+{
+    Tree *B;
+    keyT empty = {NULL};
+    dataT data = {0,0,0,0};
+
+    if (fanout > MAX_FANOUT)
+        fanout = MAX_FANOUT;
+
+    setbplustree(B, malloc(sizeof(Tree)));
+    memset(B, 0, sizeof(Tree));
+    setfanout(B, fanout);
+    setminfanout(B, (fanout + 1) >> 1);
+    initFreeNodePool(B, poolsz);
+
+    setleaf(B, getFreeNode(B));                /* set up the first leaf node */
+    setroot(B, getleaf(B));            /* the root is initially the leaf */
+    setflag(getroot(B), isLEAF);
+    setflag(getroot(B), isROOT);
+    setflag(getroot(B), FEWEST);
+    inittreeheight(B);
+
+    setfunkey(B,empty);
+    setfundata(B,data);
+    setcomparekeys(B, keyCmp);
+
+#ifdef DEBUG_BTREE
+    sprintf(B->message, "INIT:  B+tree of fanout %d at %10p.\n", fanout, (void *)B);
+    OutputDebugString(B->message);
+#endif
+
+  return B;
+}
+
+/********************   Clean up B+tree structure   ********************/
+/*
+ *  dirlock must be write locked
+ */
+void freeBtree(Tree *B)
+{
+#ifdef DEBUG_BTREE
+    sprintf(B->message, "FREE:  B+tree at %10p.\n", (void *) B);
+    OutputDebugString(B->message);
+#endif
+
+    cleanupNodePool(B);
+
+    memset(B, 0, sizeof(*B));
+    free((void *) B);
+}
+
+
+/***********************************************************************\
+|      Find location for data                                          |
+\***********************************************************************/
+
+/**********************   top level lookup   **********************/
+Nptr bplus_Lookup(Tree *B, keyT key)
+{
+    Nptr       findNode;
+
+#ifdef DEBUG_BTREE
+    sprintf(B->message, "LOOKUP:  key %s.\n", key.name);
+    OutputDebugString(B->message);
+#endif
+
+    setfunkey(B, key);                 /* set search key */
+    findNode = descendToLeaf(B, getroot(B));   /* start search from root node */
+
+#ifdef DEBUG_BTREE
+    if (findNode) {
+        int         slot;
+        Nptr        dataNode;
+        dataT       data;
+
+        slot = findKey(B, findNode, 1, numentries(findNode));
+        dataNode = getnode(findNode, slot);
+        data = getdatavalue(dataNode);
+
+        sprintf(B->message, "LOOKUP: %s found on page %d value (%d.%d.%d).\n",
+                 key.name,
+                 getnodenumber(B, findNode), 
+                 data.volume, 
+                 data.vnode,
+                 data.unique);
+    } else 
+        sprintf(B->message, "LOOKUP: not found!\n");
+    OutputDebugString(B->message);
+#endif
+
+    return findNode;
+}
+
+/**********************   `recurse' down B+tree   **********************/
+static Nptr descendToLeaf(Tree *B, Nptr curr)
+{
+    int        slot;
+    Nptr       findNode;
+    Nptr    prev[64];
+    int depth;
+
+    memset(prev, 0, sizeof(prev));
+
+    for (depth = 0, slot = getSlot(B, curr); isinternal(curr); depth++, slot = getSlot(B, curr)) {
+        prev[depth] = curr;
+        if (slot == 0)
+            curr = getfirstnode(curr);
+        else
+            curr = getnode(curr, slot);
+#ifdef DEBUG_BTREE
+        if ( !isnode(curr) )
+            DebugBreak();
+#endif
+    }
+    if ((slot > 0) && !comparekeys(B)(getfunkey(B), getkey(curr, slot), 0))
+        findNode = curr;                       /* correct key value found */
+    else
+        findNode = NONODE;                     /* key value not in tree */
+
+    return findNode;
+}
+
+/********************   find slot for search key   *********************/
+static int getSlot(Tree *B, Nptr curr)
+{
+    int slot, entries;
+
+    entries = numentries(curr);                /* need this if root is ever empty */
+    slot = !entries ? 0 : findKey(B, curr, 1, entries);
+
+    return slot;
+}
+
+
+/********************   recursive binary search   **********************/
+static int findKey(Tree *B, Nptr curr, int lo, int hi)
+{
+    int mid, findslot = BTERROR;
+
+    if (hi == lo) {
+        findslot = bestMatch(B, curr, lo);             /* recursion base case */
+
+#ifdef DEBUG_BTREE
+        if (findslot == BTERROR) {
+            sprintf(B->message, "Bad key ordering on node %d\n", getnodenumber(B, curr));
+            OutputDebugString(B->message);
+        }
+#endif
+    } else {
+        mid = (lo + hi) >> 1;
+        switch (findslot = bestMatch(B, curr, mid)) {
+        case BTLOWER:                          /* check lower half of range */
+            findslot = findKey(B, curr, lo, mid - 1);          /* never in 2-3+trees */
+            break;
+        case BTUPPER:                          /* check upper half of range */
+            findslot = findKey(B, curr, mid + 1, hi);
+            break;
+        case BTERROR:
+            sprintf(B->message, "Bad key ordering on node %d\n", getnodenumber(B, curr));
+            OutputDebugString(B->message);
+        }
+    }
+    return findslot;
+}
+
+
+/************   comparison of key with a target key slot   *************/
+static int bestMatch(Tree *B, Nptr curr, int slot)
+{
+    int diff, comp, findslot;
+
+    diff = comparekeys(B)(getfunkey(B), getkey(curr, slot), 0);
+    if (diff < 0) {            /* also check previous slot */
+        if ((slot == 1) ||
+             ((comp = comparekeys(B)(getfunkey(B), getkey(curr, slot - 1), 0)) >= 0)) 
+        {
+            findslot = slot - 1;
+        }
+        else if (comp < diff) {
+            findslot = BTERROR;                /* inconsistent ordering of keys */
+#ifdef DEBUG_BTREE
+            DebugBreak();
+#endif
+        }
+        else {
+            findslot = BTLOWER;                /* key must be below in node ordering */
+        }
+    } else {                   /* or check following slot */
+        if ((slot == numentries(curr)) ||
+             ((comp = comparekeys(B)(getfunkey(B), getkey(curr, slot + 1), 0)) < 0))
+        {
+            findslot = slot;
+        }
+        else if (comp == 0) {
+            findslot = slot + 1;
+        }
+        else if (comp > diff) {
+            findslot = BTERROR;                /* inconsistent ordering of keys */
+#ifdef DEBUG_BTREE
+            DebugBreak();
+#endif
+        }
+        else {
+            findslot = BTUPPER;                /* key must be above in node ordering */
+        }
+    }
+    return findslot;
+}
+
+
+/***********************************************************************\
+|      Insert new data into tree                                       |
+\***********************************************************************/
+
+
+/**********************   top level insert call   **********************/
+void insert(Tree *B, keyT key, dataT data)
+{
+    Nptr newNode;
+
+#ifdef DEBUG_BTREE
+    sprintf(B->message, "INSERT:  key %s.\n", key.name);
+    OutputDebugString(B->message);
+#endif
+
+    setfunkey(B, key);                         /* set insertion key */
+    setfundata(B, data);                       /* a node containing data */
+    setsplitpath(B, NONODE);
+    newNode = descendSplit(B, getroot(B));     /* insertion point search from root */
+    if (newNode != getsplitpath(B))            /* indicates the root node has split */
+        makeNewRoot(B, getroot(B), newNode);
+}
+
+
+/*****************   recurse down and split back up   ******************/
+static Nptr 
+descendSplit(Tree *B, Nptr curr)
+{
+    Nptr       downNode = NONODE, sibling = NONODE;
+    int        slot;
+
+#ifdef DEBUG_BTREE
+    if (!isnode(curr))
+        DebugBreak();
+#endif
+    if (!isfull(curr))
+        setsplitpath(B, NONODE);
+    else if (getsplitpath(B) == NONODE)
+        setsplitpath(B, curr);                 /* indicates where nodes must split */
+
+    slot = getSlot(B, curr);                   /* is null only if the root is empty */
+    if (isinternal(curr)) {    /* continue recursion to leaves */
+        if (slot == 0)
+            downNode = descendSplit(B, getfirstnode(curr));
+        else
+            downNode = descendSplit(B, getnode(curr, slot));
+    } else if ((slot > 0) && !comparekeys(B)(getfunkey(B), getkey(curr, slot), 0)) {
+        if (!(gettreeflags(B) & TREE_FLAG_UNIQUE_KEYS)) {
+            downNode = getDataNode(B, getfunkey(B), getfundata(B));
+            getdatanext(downNode) = getnode(curr,slot);
+            setnode(curr, slot, downNode);
+        }
+        downNode = NONODE;
+        setsplitpath(B, NONODE);
+    }
+    else
+        downNode = getDataNode(B, getfunkey(B), getfundata(B));        /* an insertion takes place */
+
+    if (downNode != NONODE) {                  /* insert only where necessary */
+        if (getsplitpath(B) != NONODE)
+            sibling = split(B, curr);          /* a sibling node is prepared */
+        insertEntry(B, curr, slot, sibling, downNode);
+    }
+
+    return sibling;
+}
+
+/***************   determine location of inserted key   ****************/
+static void 
+insertEntry(Tree *B, Nptr currNode, int slot, Nptr sibling, Nptr downPtr)
+{
+    int split, i, j, k, x, y;
+    keyT key;
+
+#ifdef DEBUG_BTREE
+    sprintf(B->message, "INSERT:  slot %d, down node %d.\n", slot, getnodenumber(B, downPtr));
+    OutputDebugString(B->message);
+#endif
+
+    if (sibling == NONODE) {           /* no split occurred */
+        placeEntry(B, currNode, slot + 1, downPtr);
+    }
+    else {                             /* split entries between the two */
+        if isinternal(currNode) {
+            i = 1;
+            split = getfanout(B) - getminfanout(B, currNode);
+        } else if (isroot(currNode)) {
+            /* split the root node and turn it into just a leaf */
+            i = 0;
+            split = getminfanout(B, currNode);
+        } else  {
+            i = 0;
+            split = getminfanout(B, currNode);
+        }
+        j = (slot != split ? 1 : 0);
+        k = (slot >= split ? 1 : 0);
+
+        /*
+         * Move entries from the top half of the current node to 
+         * to the sibling node.
+         * The number of entries to move is dependent upon where
+         * the new entry is going to be added in relationship to
+         * the split slot. (slots are 1-based).  If order to produce
+         * a balanced tree, if the insertion slot is greater than
+         * the split we move one less entry as the new entry will
+         * be inserted into the sibling.
+         *
+         * If the node that is being split is an internal node (i != 0)
+         * then we move one less entry due to the extra down pointer
+         * when the split slot is not equal to the insertion slot 
+         */
+        for (x = split + k + j * i, y = 1; x <= getfanout(B); x++, y++) {
+            xferentry(currNode, x, sibling, y);        /* copy entries to sibling */
+            clrentry(currNode, x);
+            decentries(currNode);
+            incentries(sibling);
+        }
+        clrflag(currNode, isFULL);
+        if (numentries(currNode) == getminfanout(B, currNode))
+            setflag(currNode, FEWEST);         /* never happens in even size nodes */
+
+#ifdef DEBUG_BTREE
+        if (numentries(sibling) > getfanout(B))
+            DebugBreak();
+#endif
+        if (numentries(sibling) == getfanout(B))
+            setflag(sibling, isFULL);          /* only ever happens in 2-3+trees */
+
+        if (numentries(sibling) > getminfanout(B, sibling))
+            clrflag(sibling, FEWEST);
+
+        if (i) {                               /* set first pointer of internal node */
+            if (j) {
+                setfirstnode(sibling, getnode(currNode, split + k));
+                decentries(currNode);
+                if (numentries(currNode) == getminfanout(B, currNode))
+                    setflag(currNode, FEWEST);
+                else
+                    clrflag(currNode, FEWEST);
+            }
+            else
+                setfirstnode(sibling, downPtr);
+        }
+
+        if (j) {                               /* insert new entry into correct spot */
+            if (k)
+                placeEntry(B, sibling, slot - split + 1 - i, downPtr);
+            else
+                placeEntry(B, currNode, slot + 1, downPtr);
+
+            /* set key separating nodes */
+            if (isleaf(sibling))
+                key = getkey(sibling, 1); 
+            else {
+                Nptr leaf = getfirstnode(sibling);
+                while ( isinternal(leaf) )
+                    leaf = getfirstnode(leaf);
+                key = getkey(leaf, 1);
+            }
+            setfunkey(B, key);
+        }
+        else if (!i)
+            placeEntry(B, sibling, 1, downPtr);
+    }
+}
+
+/************   place key into appropriate node & slot   ***************/
+static void 
+placeEntry(Tree *B, Nptr node, int slot, Nptr downPtr)
+{
+    int x;
+
+#ifdef DEBUG_BTREE
+    if (isfull(node))
+        DebugBreak();
+#endif
+
+    for (x = numentries(node); x >= slot; x--) /* make room for new entry */
+        pushentry(node, x, 1);
+    setentry(node, slot, getfunkey(B), downPtr);/* place it in correct slot */
+
+    incentries(node);                          /* adjust entry counter */
+    if (numentries(node) == getfanout(B))
+        setflag(node, isFULL);
+    if (numentries(node) > getminfanout(B, node))
+        clrflag(node, FEWEST);
+    else
+        setflag(node, FEWEST);
+}
+
+
+/*****************   split full node and set flags   *******************/
+static Nptr 
+split(Tree *B, Nptr node)
+{
+    Nptr sibling;
+
+    sibling = getFreeNode(B);
+
+    setflag(sibling, FEWEST);                  /* set up node flags */
+
+    if (isleaf(node)) {
+        setflag(sibling, isLEAF);
+        setnextnode(sibling, getnextnode(node));/* adjust leaf pointers */
+        setnextnode(node, sibling);
+    }
+    if (getsplitpath(B) == node)
+        setsplitpath(B, NONODE);               /* no more splitting needed */
+
+    if (isroot(node))
+        clrflag(node, isROOT);
+
+    return sibling;
+}
+
+
+/**********************   build new root node   ************************/
+static void
+makeNewRoot(Tree *B, Nptr oldRoot, Nptr newNode)
+{
+    setroot(B, getFreeNode(B));
+
+    setfirstnode(getroot(B), oldRoot); /* old root becomes new root's child */
+    setentry(getroot(B), 1, getfunkey(B), newNode);    /* old root's sibling also */
+    incentries(getroot(B));
+#ifdef DEBUG_BTREE
+    if (numentries(getroot(B)) > getfanout(B))
+        DebugBreak();
+#endif
+
+    /* the oldRoot's isROOT flag was cleared in split() */
+    setflag(getroot(B), isROOT);
+    setflag(getroot(B), FEWEST);
+    clrflag(getroot(B), isLEAF);
+    inctreeheight(B);
+}
+
+
+/***********************************************************************\
+|      Delete data from tree                                           |
+\***********************************************************************/
+
+/**********************   top level delete call   **********************\
+|
+|      The recursive call for deletion carries 5 additional parameters
+|      which may be needed to rebalance the B+tree when removing the key.
+|      These parameters are:
+|              1. immediate left neighbor of the current node
+|              2. immediate right neighbor of the current node
+|              3. the anchor of the current node and left neighbor
+|              4. the anchor of the current node and right neighbor
+|              5. the parent of the current node
+|
+|      All of these parameters are simple to calculate going along the
+|      recursive path to the leaf nodes and the point of key deletion.
+|      At that time, the algorithm determines which node manipulations
+|      are most efficient, that is, cause the least rearranging of data,
+|      and minimize the need for non-local key manipulation.
+|
+\***********************************************************************/
+void delete(Tree *B, keyT key)
+{
+    Nptr newNode;
+
+#ifdef DEBUG_BTREE
+    sprintf(B->message, "DELETE:  key %s.\n", key.name);
+    OutputDebugString(B->message);
+#endif
+
+    setfunkey(B, key);                 /* set deletion key */
+    setmergepath(B, NONODE);
+    newNode = descendBalance(B, getroot(B), NONODE, NONODE, NONODE, NONODE, NONODE);
+    if (isnode(newNode)) {
+#ifdef DEBUG_BTREE
+        sprintf(B->message, "DELETE: collapsing node %d", getnodenumber(B, newNode));
+        OutputDebugString(B->message);
+#endif
+        collapseRoot(B, getroot(B), newNode);  /* remove root when superfluous */
+    }
+}
+
+
+/**********************   remove old root node   ***********************/
+static void 
+collapseRoot(Tree *B, Nptr oldRoot, Nptr newRoot)
+{
+
+#ifdef DEBUG_BTREE
+    sprintf(B->message, "COLLAPSE:  old %d, new %d.\n", getnodenumber(B, oldRoot), getnodenumber(B, newRoot));
+    OutputDebugString(B->message);
+    showNode(B, "collapseRoot oldRoot", oldRoot);
+    showNode(B, "collapseRoot newRoot", newRoot);
+#endif
+
+    setroot(B, newRoot);
+    setflag(newRoot, isROOT);
+    clrflag(newRoot, FEWEST);
+    putFreeNode(B, oldRoot);
+    dectreeheight(B);                  /* the height of the tree decreases */
+}
+
+
+/****************   recurse down and balance back up   *****************/
+static Nptr 
+descendBalance(Tree *B, Nptr curr, Nptr left, Nptr right, Nptr lAnc, Nptr rAnc, Nptr parent)
+{
+    Nptr       newMe=NONODE, myLeft=NONODE, myRight=NONODE, lAnchor=NONODE, rAnchor=NONODE, newNode=NONODE;
+    int        slot = 0, notleft = 0, notright = 0, fewleft = 0, fewright = 0, test = 0;
+
+#ifdef DEBUG_BTREE
+    sprintf(B->message, "descendBalance curr %d, left %d, right %d, lAnc %d, rAnc %d, parent %d\n",
+             curr ? getnodenumber(B, curr) : -1,
+             left ? getnodenumber(B, left) : -1,
+             right ? getnodenumber(B, right) : -1,
+             lAnc ? getnodenumber(B, lAnc) : -1,
+             rAnc ? getnodenumber(B, rAnc) : -1,
+             parent ? getnodenumber(B, parent) : -1);
+    OutputDebugString(B->message);
+#endif
+
+    if (!isfew(curr))
+        setmergepath(B,NONODE);
+    else if (getmergepath(B) == NONODE)
+        setmergepath(B, curr);         /* mark which nodes may need rebalancing */
+
+    slot = getSlot(B, curr);
+
+    if (isinternal(curr))      /* set up next recursion call's parameters */
+    {
+        if (slot == 0) {
+            newNode = getfirstnode(curr);
+            myLeft = !isnode(left) ? NONODE : getlastnode(left);
+            lAnchor = lAnc;
+        }
+        else {
+            newNode = getnode(curr, slot);
+            if (slot == 1)
+                myLeft = getfirstnode(curr);
+            else
+                myLeft = getnode(curr, slot - 1);
+            lAnchor = curr;
+        }
+
+        if (slot == numentries(curr)) {
+            myRight = !isnode(right) ? NONODE : getfirstnode(right);
+            rAnchor = rAnc;
+        }
+        else {
+            myRight = getnode(curr, slot + 1);
+            rAnchor = curr;
+        }
+        newMe = descendBalance(B, newNode, myLeft, myRight, lAnchor, rAnchor, curr);
+    }
+    else if ((slot > 0) && !comparekeys(B)(getfunkey(B), getkey(curr, slot), 0)) 
+    {
+        Nptr        next;
+        int         exact = 0;
+        int         count = 0;
+
+        newNode = getnode(curr, slot);
+        next = getdatanext(newNode);
+
+        /*
+         * We only delete exact matches.  
+         */
+        if (!comparekeys(B)(getfunkey(B), getdatakey(newNode), EXACT_MATCH)) {
+            /* exact match, free the first entry */
+            setnode(curr, slot, next);
+
+            if (next == NONODE) {
+                /* delete this key as there are no more data values */
+                newMe = newNode;
+            } else { 
+                /* otherwise, there are more and we leave the key in place */
+                setnode(curr, slot, next);
+                putFreeNode(B, newNode);
+
+                /* but do not delete the key */
+                newMe = NONODE;
+                setmergepath(B, NONODE);
+            }
+        } else if (next == NONODE) {
+            /* 
+             * we didn't find an exact match and there are no more
+             * choices.  so we leave it alone and remove nothing.
+             */
+            newMe = NONODE;
+            setmergepath(B, NONODE);
+        } else {
+            /* The first data node doesn't match but there are other 
+             * options.  So we must determine if any of the next nodes
+             * are the one we are looking for.
+             */
+            Nptr dataNode = newNode;
+
+            while ( next ) {
+                if (!comparekeys(B)(getfunkey(B), getdatakey(next), EXACT_MATCH)) {
+                    /* we found the one to delete */
+                    getdatanext(dataNode) = getdatanext(next);
+                    putFreeNode(B, next);
+                }
+            }
+            
+            /* do not delete the key */
+            newMe = NONODE;
+            setmergepath(B, NONODE);
+        }
+    }
+    else 
+    {
+        newMe = NONODE;                /* no deletion possible, key not found */
+        setmergepath(B, NONODE);
+    }
+
+/*****************   rebalancing tree after deletion   *****************\
+|
+|      The simplest B+tree rebalancing consists of the following rules.
+|
+|      If a node underflows:
+|      CASE 1 check if it is the root, and collapse it if it is,
+|      CASE 2 otherwise, check if both of its neighbors are minimum
+|          sized and merge the underflowing node with one of them,
+|      CASE 3 otherwise shift surplus entries to the underflowing node.
+|
+|      The choice of which neighbor to use is optional.  However, the
+|      rebalancing rules that follow also ensure whenever possible
+|      that the merges and shifts which do occur use a neighbor whose
+|      anchor is the parent of the underflowing node.
+|
+|      Cases 3, 4, 5 below are more an optimization than a requirement,
+|      and can be omitted, with a change of the action value in case 6,
+|      which actually corresponds to the third case described above.
+|
+\***********************************************************************/
+
+    /* begin deletion, working upwards from leaves */
+
+    if (newMe != NONODE) {     /* this node removal doesn't consider duplicates */
+#ifdef DEBUG_BTREE
+        sprintf(B->message, "descendBalance DELETE:  slot %d, node %d.\n", slot, getnodenumber(B, curr));
+        OutputDebugString(B->message);
+#endif
+
+        removeEntry(B, curr, slot + (newMe != newNode));       /* removes one of two */
+
+#ifdef DEBUG_BTREE
+        showNode(B, "descendBalance curr", curr);
+#endif
+    }
+
+    if (getmergepath(B) == NONODE)
+        newNode = NONODE;
+    else {             /* tree rebalancing rules for node merges and shifts */
+        notleft = !isnode(left);
+        notright = !isnode(right);
+        if (!notleft)
+            fewleft = isfew(left);             /* only used when defined */
+        if (!notright)
+            fewright = isfew(right);
+
+        /* CASE 1:  prepare root node (curr) for removal */
+        if (notleft && notright) {
+            test = isleaf(curr);               /* check if B+tree has become empty */
+            newNode = test ? NONODE : getfirstnode(curr);
+        }
+        /* CASE 2:  the merging of two nodes is a must */
+        else if ((notleft || fewleft) && (notright || fewright)) {
+            test = (lAnc != parent);
+            newNode = test ? merge(B, curr, right, rAnc) : merge(B, left, curr, lAnc);
+        }
+        /* CASE 3: choose the better of a merge or a shift */
+        else if (!notleft && fewleft && !notright && !fewright) {
+            test = (rAnc != parent) && (curr == getmergepath(B));
+            newNode = test ? merge(B, left, curr, lAnc) : shift(B, curr, right, rAnc);
+        }
+        /* CASE 4: also choose between a merge or a shift */
+        else if (!notleft && !fewleft && !notright && fewright) {
+            test = !(lAnc == parent) && (curr == getmergepath(B));
+            newNode = test ? merge(B, curr, right, rAnc) : shift(B, left, curr, lAnc);
+        }
+        /* CASE 5: choose the more effective of two shifts */
+        else if (lAnc == rAnc) {               /* => both anchors are the parent */
+            test = (numentries(left) <= numentries(right));
+            newNode = test ? shift(B, curr, right, rAnc) : shift(B, left, curr, lAnc);
+        }
+        /* CASE 6: choose the shift with more local effect */
+        else {                         /* if omitting cases 3,4,5 use below */
+            test = (lAnc == parent);           /* test = (!notleft && !fewleft); */
+            newNode = test ? shift(B, left, curr, lAnc) : shift(B, curr, right, rAnc);
+        }
+    }
+
+#ifdef DEBUG_BTREE
+    sprintf(B->message, "descendBalance returns %d\n", getnodenumber(B, newNode));
+    OutputDebugString(B->message);
+#endif
+    return newNode;
+}
+
+
+/****************   remove key and pointer from node   *****************/
+static void
+removeEntry(Tree *B, Nptr curr, int slot)
+{
+    int x;
+
+    putFreeNode(B, getnode(curr, slot));       /* return deleted node to free list */
+    for (x = slot; x < numentries(curr); x++)
+        pullentry(curr, x, 1);                 /* adjust node with removed key */
+    decentries(curr);
+    clrflag(curr, isFULL);                     /* keep flag information up to date */
+    if (numentries(curr) > getminfanout(B, curr))
+        clrflag(curr, FEWEST);
+    else
+        setflag(curr, FEWEST);
+}
+
+
+/*******   merge a node pair & set emptied node up for removal   *******/
+static Nptr 
+merge(Tree *B, Nptr left, Nptr right, Nptr anchor)
+{
+    int        x, y, z;
+
+#ifdef DEBUG_BTREE
+    sprintf(B->message, "MERGE:  left %d, right %d.\n", getnodenumber(B, left), getnodenumber(B, right));
+    OutputDebugString(B->message);
+    showNode(B, "pre-merge anchor", anchor);
+    showNode(B, "pre-merge left", left);
+    showNode(B, "pre-merge right", right);
+#endif
+
+    if (isinternal(left)) {
+        incentries(left);                      /* copy key separating the nodes */
+#ifdef DEBUG_BTREE
+        if (numentries(left) > getfanout(B))
+            DebugBreak();
+#endif
+        setfunkey(B, getkey(right, 1));        /* defined but maybe just deleted */
+        z = getSlot(B, anchor);                /* needs the just calculated key */
+        setfunkey(B, getkey(anchor, z));       /* set slot to delete in anchor */
+        setentry(left, numentries(left), getfunkey(B), getfirstnode(right));
+    }
+    else
+        setnextnode(left, getnextnode(right));
+
+    for (x = numentries(left) + 1, y = 1; y <= numentries(right); x++, y++) {
+        incentries(left);
+#ifdef DEBUG_BTREE
+        if (numentries(left) > getfanout(B))
+            DebugBreak();
+#endif
+        xferentry(right, y, left, x);  /* transfer entries to left node */
+    }
+    clearentries(right);
+
+    if (numentries(left) > getminfanout(B, left))
+        clrflag(left, FEWEST);
+    if (numentries(left) == getfanout(B))
+        setflag(left, isFULL);         /* never happens in even size nodes */
+
+    if (getmergepath(B) == left || getmergepath(B) == right)
+        setmergepath(B, NONODE);               /* indicate rebalancing is complete */
+
+#ifdef DEBUG_BTREE
+    showNode(B, "post-merge anchor", anchor);
+    showNode(B, "post-merge left", left);
+    showNode(B, "post-merge right", right);
+#endif
+    return right;
+}
+
+
+/******   shift entries in a node pair & adjust anchor key value   *****/
+static Nptr 
+shift(Tree *B, Nptr left, Nptr right, Nptr anchor)
+{
+    int        i, x, y, z;
+
+#ifdef DEBUG_BTREE
+    sprintf(B->message, "SHIFT:  left %d, right %d, anchor %d.\n", 
+             getnodenumber(B, left), 
+             getnodenumber(B, right), 
+             getnodenumber(B, anchor));
+    OutputDebugString(B->message);
+    showNode(B, "pre-shift anchor", anchor);
+    showNode(B, "pre-shift left", left);
+    showNode(B, "pre-shift right", right);
+#endif
+
+    i = isinternal(left);
+    
+    if (numentries(left) < numentries(right)) {        /* shift entries to left */
+        y = (numentries(right) - numentries(left)) >> 1;
+        x = numentries(left) + y;
+        setfunkey(B, getkey(right, y + 1 - i));        /* set new anchor key value */
+        z = getSlot(B, anchor);                        /* find slot in anchor node */
+#ifdef DEBUG_BTREE
+        if (z == 0 && !isroot(anchor))
+            DebugBreak();
+#endif
+        if (i) {                                       /* move out old anchor value */
+            decentries(right);                 /* adjust for shifting anchor */
+            incentries(left);
+#ifdef DEBUG_BTREE
+            if (numentries(left) > getfanout(B))
+                DebugBreak();
+#endif
+            setentry(left, numentries(left), getkey(anchor, z), getfirstnode(right));
+            setfirstnode(right, getnode(right, y + 1 - i));
+        }
+        clrflag(right, isFULL);
+        setkey(anchor, z, getfunkey(B));               /* set new anchor value */
+        for (z = y, y -= i; y > 0; y--, x--) {
+            decentries(right);                 /* adjust entry count */
+            incentries(left);
+#ifdef DEBUG_BTREE
+            if (numentries(left) > getfanout(B))
+                DebugBreak();
+#endif
+            xferentry(right, y, left, x);              /* transfer entries over */
+        }
+
+        for (x = 1; x <= numentries(right); x++)       /* adjust reduced node */
+            pullentry(right, x, z);
+    }
+    else if (numentries(left) > numentries(right)) {                                   /* shift entries to right */
+        y = (numentries(left) - numentries(right)) >> 1;
+        x = numentries(left) - y + 1;
+
+        for (z = numentries(right); z > 0; z--)        /* adjust increased node */
+            pushentry(right, z, y);
+        
+        setfunkey(B, getkey(left, x));                 /* set new anchor key value */
+        z = getSlot(B, anchor) + 1;
+        if (i) {
+            decentries(left);
+            incentries(right);
+#ifdef DEBUG_BTREE
+            if (numentries(right) > getfanout(B))
+                DebugBreak();
+#endif
+            setentry(right, y, getkey(anchor, z), getfirstnode(right));
+            setfirstnode(right, getnode(left, x));
+        }
+        clrflag(left, isFULL);
+        setkey(anchor, z, getfunkey(B));
+        for (x = numentries(left) + i, y -= i; y > 0; y--, x--) {
+            decentries(left);
+            incentries(right);
+#ifdef DEBUG_BTREE
+            if (numentries(right) > getfanout(B))
+                DebugBreak();
+#endif
+            xferentry(left, x, right, y);              /* transfer entries over */
+            clrentry(left, x);
+        }
+    } 
+#ifdef DEBUG_BTREE
+    else {
+        DebugBreak();
+    }
+#endif /* DEBUG_BTREE */
+
+    if (numentries(left) > getminfanout(B, left))              /* adjust node flags */
+        clrflag(left, FEWEST);                 /* never happens in 2-3+trees */
+    else
+        setflag(left, FEWEST);
+    if (numentries(right) > getminfanout(B, right))
+        clrflag(right, FEWEST);                        /* never happens in 2-3+trees */
+    else
+        setflag(right, FEWEST);
+    setmergepath(B, NONODE);
+
+#ifdef DEBUG_BTREE
+    sprintf(B->message, "SHIFT:  left %d, right %d.\n", getnodenumber(B, left), getnodenumber(B, right));
+    OutputDebugString(B->message);
+    showNode(B, "post-shift anchor", anchor);
+    showNode(B, "post-shift left", left);
+    showNode(B, "post-shift right", right);
+#endif
+
+    return NONODE;
+}
+
+
+static void
+_clrentry(Nptr node, int entry)
+{
+    if (getkey(node,entry).name != NULL) {
+        free(getkey(node,entry).name);
+        getkey(node,entry).name = NULL;
+    }
+    getnode(node,entry) = NONODE;
+}
+
+static void
+_pushentry(Nptr node, int entry, int offset) 
+{
+    if (getkey(node,entry + offset).name != NULL)
+        free(getkey(node,entry + offset).name);
+    getkey(node,entry + offset).name = strdup(getkey(node,entry).name);
+#ifdef DEBUG_BTREE
+    if ( getnode(node, entry) == NONODE )
+        DebugBreak();
+#endif
+    getnode(node,entry + offset) = getnode(node,entry);
+}
+
+static void
+_pullentry(Nptr node, int entry, int offset)
+{
+    if (getkey(node,entry).name != NULL)
+        free(getkey(node,entry).name);
+    getkey(node,entry).name = strdup(getkey(node,entry + offset).name);
+#ifdef DEBUG_BTREE
+    if ( getnode(node, entry + offset) == NONODE )
+        DebugBreak();
+#endif
+    getnode(node,entry) = getnode(node,entry + offset);
+}
+
+static void
+_xferentry(Nptr srcNode, int srcEntry, Nptr destNode, int destEntry)
+{
+    if (getkey(destNode,destEntry).name != NULL)
+        free(getkey(destNode,destEntry).name);
+    getkey(destNode,destEntry).name = strdup(getkey(srcNode,srcEntry).name);
+#ifdef DEBUG_BTREE
+    if ( getnode(srcNode, srcEntry) == NONODE )
+        DebugBreak();
+#endif
+    getnode(destNode,destEntry) = getnode(srcNode,srcEntry);
+}
+
+static void
+_setentry(Nptr node, int entry, keyT key, Nptr downNode)
+{
+    if (getkey(node,entry).name != NULL)
+        free(getkey(node,entry).name);
+    getkey(node,entry).name = strdup(key.name);
+#ifdef DEBUG_BTREE
+    if ( downNode == NONODE )
+        DebugBreak();
+#endif
+    getnode(node,entry) = downNode;
+}
+
+
+/***********************************************************************\
+|      Empty Node Utilities                                            |
+\***********************************************************************/
+
+/*********************   Set up initial pool of free nodes   ***********/
+static void 
+initFreeNodePool(Tree *B, int quantity)
+{
+    int        i;
+    Nptr       n, p;
+
+    setfirstallnode(B, NONODE);
+    setfirstfreenode(B, NONODE);
+
+    for (i = 0, p = NONODE; i < quantity; i++) {
+        n = malloc(sizeof(*n));
+        memset(n, 0, sizeof(*n));
+        setnodenumber(B,n,i);
+
+        if (p) {
+            setnextnode(p, n);         /* insert node into free node list */
+            setallnode(p, n);
+        } else {
+            setfirstfreenode(B, n);
+            setfirstallnode(B, n);
+        }
+        p = n;
+    }
+    setnextnode(p, NONODE);            /* indicates end of free node list */
+    setallnode(p, NONODE);              /* indicates end of all node list */
+    
+    setpoolsize(B, quantity);
+}
+
+
+/*******************   Cleanup Free Node Pool **************************/
+
+static void
+cleanupNodePool(Tree *B)
+{
+    int i, j;
+    Nptr node, next;
+
+    for ( i=0, node = getfirstallnode(B); node != NONODE && i<getpoolsize(B); node = next, i++ ) {
+        if (isdata(node)) {
+            if ( getdatakey(node).name )
+                free(getdatakey(node).name);
+        } else { /* data node */
+            for ( j=1; j<=getfanout(B); j++ ) {
+                if (getkey(node, j).name)
+                    free(getkey(node, j).name);
+            }
+        }
+        next = getallnode(node);
+        free(node);
+    }
+}
+
+/**************   take a free B+tree node from the pool   **************/
+static Nptr 
+getFreeNode(Tree *B)
+{
+    Nptr newNode = getfirstfreenode(B);
+
+    if (newNode != NONODE) {
+        setfirstfreenode(B, getnextnode(newNode));     /* adjust free node list */
+        setnextnode(newNode, NONODE);                  /* remove node from list */
+    }
+    else {
+        newNode = malloc(sizeof(*newNode));
+        memset(newNode, 0, sizeof(*newNode));
+
+        setallnode(newNode, getfirstallnode(B));
+        setfirstallnode(B, newNode);
+        setnodenumber(B, newNode, getpoolsize(B));
+        setpoolsize(B, getpoolsize(B) + 1);
+    }
+
+    clearflags(newNode);                        /* Sets MAGIC */
+    return newNode;
+}
+
+
+/*************   return a deleted B+tree node to the pool   ************/
+static void 
+putFreeNode(Tree *B, Nptr node)
+{
+    int i;
+
+    if (isntnode(node))
+        return;
+
+    if (isdata(node)) {
+        if ( getdatakey(node).name )
+            free(getdatakey(node).name);
+    } else {    /* data node */
+        for ( i=1; i<=getfanout(B); i++ ) {
+            if (getkey(node, i).name)
+                free(getkey(node, i).name);
+        }
+    }
+
+    /* free nodes do not have MAGIC set */
+    memset(&nAdr(node), 0, sizeof(nAdr(node)));
+    setnextnode(node, getfirstfreenode(B));    /* add node to list */
+    setfirstfreenode(B, node);                 /* set it to be list head */
+}
+
+
+/*******   fill a free data node with a key and associated data   ******/
+static Nptr 
+getDataNode(Tree *B, keyT key, dataT data)
+{
+    Nptr       newNode = getFreeNode(B);
+
+    setflag(newNode, isDATA);
+    getdatakey(newNode).name = strdup(key.name);
+    getdatavalue(newNode) = data;
+    getdatanext(newNode) = NONODE;
+
+    return newNode;
+}
+
+
+#ifdef DEBUG_BTREE
+/***********************************************************************\
+|      B+tree Printing Utilities                                       |
+\***********************************************************************/
+
+/***********************   B+tree node printer   ***********************/
+void showNode(Tree *B, const char * where, Nptr n)
+{
+    int x;
+
+    sprintf(B->message, "-  --  --  --  --  --  --  --  --  --  --  --  -\n");
+    OutputDebugString(B->message);
+    sprintf(B->message, "| %-20s                        |\n", where);
+    OutputDebugString(B->message);
+    sprintf(B->message, "| node %6d                 ", getnodenumber(B, n));
+    OutputDebugString(B->message);
+    sprintf(B->message, "  magic    %4x  |\n", getmagic(n));
+    OutputDebugString(B->message);
+    sprintf(B->message, "-  --  --  --  --  --  --  --  --  --  --  --  -\n");
+    OutputDebugString(B->message);
+    sprintf(B->message, "| flags   %1d%1d%1d%1d ", isfew(n), isfull(n), isroot(n), isleaf(n));
+    OutputDebugString(B->message);
+    sprintf(B->message, "| keys = %5d ", numentries(n));
+    OutputDebugString(B->message);
+    sprintf(B->message, "| node = %6d  |\n", getnodenumber(B, getfirstnode(n)));
+    OutputDebugString(B->message);
+    for (x = 1; x <= numentries(n); x++) {
+        sprintf(B->message, "| entry %6d ", x);
+        OutputDebugString(B->message);
+        sprintf(B->message, "| key = %6s ", getkey(n, x).name);
+        OutputDebugString(B->message);
+        sprintf(B->message, "| node = %6d  |\n", getnodenumber(B, getnode(n, x)));
+        OutputDebugString(B->message);
+    }
+    sprintf(B->message, "-  --  --  --  --  --  --  --  --  --  --  --  -\n");
+    OutputDebugString(B->message);
+}
+
+/******************   B+tree class variable printer   ******************/
+void showBtree(Tree *B)
+{
+    sprintf(B->message, "-  --  --  --  --  --  -\n");
+    OutputDebugString(B->message);
+    sprintf(B->message, "|  B+tree  %10p  |\n", (void *) B);
+    OutputDebugString(B->message);
+    sprintf(B->message, "-  --  --  --  --  --  -\n");
+    OutputDebugString(B->message);
+    sprintf(B->message, "|  root        %6d  |\n", getnodenumber(B, getroot(B)));
+    OutputDebugString(B->message);
+    sprintf(B->message, "|  leaf        %6d  |\n", getnodenumber(B, getleaf(B)));
+    OutputDebugString(B->message);
+    sprintf(B->message, "|  fanout         %3d  |\n", getfanout(B) + 1);
+    OutputDebugString(B->message);
+    sprintf(B->message, "|  minfanout      %3d  |\n", getminfanout(B, getroot(B)) + 1);
+    OutputDebugString(B->message);
+    sprintf(B->message, "|  height         %3d  |\n", gettreeheight(B));
+    OutputDebugString(B->message);
+    sprintf(B->message, "|  freenode    %6d  |\n", getnodenumber(B, getfirstfreenode(B)));
+    OutputDebugString(B->message);
+    sprintf(B->message, "|  theKey      %6s  |\n", getfunkey(B).name);
+    OutputDebugString(B->message);
+    sprintf(B->message, "|  theData     %d.%d.%d |\n", getfundata(B).volume,
+             getfundata(B).vnode, getfundata(B).unique);
+    OutputDebugString(B->message);
+    sprintf(B->message, "-  --  --  --  --  --  -\n");
+    OutputDebugString(B->message);
+}
+
+void 
+listBtreeNodes(Tree *B, const char * parent_desc, Nptr node)
+{
+    int i;
+    char thisnode[64];
+    dataT data;
+
+    if (isntnode(node)) {
+        sprintf(B->message, "%s - NoNode!!!\n");
+        OutputDebugString(B->message);
+        return;
+    } 
+    
+    if (!isnode(node))
+    {
+        data = getdatavalue(node);
+        sprintf(B->message, "%s - data node %d (%d.%d.%d)\n", 
+                 parent_desc, getnodenumber(B, node),
+                 data.volume, data.vnode, data.unique);
+        OutputDebugString(B->message);
+        return;
+    } else 
+        showNode(B, parent_desc, node);
+
+    if ( isinternal(node) || isroot(node) ) {
+        sprintf(thisnode, "parent %6d", getnodenumber(B , node));
+
+        OutputDebugString(B->message);
+        for ( i= isinternal(node) ? 0 : 1; i <= numentries(node); i++ ) {
+            listBtreeNodes(B, thisnode, i == 0 ? getfirstnode(node) : getnode(node, i));
+        }
+    }
+}
+
+/***********************   B+tree data printer   ***********************/
+void 
+listBtreeValues(Tree *B, Nptr n, int num)
+{
+    int slot;
+    keyT prev = {""};
+    dataT data;
+
+    for (slot = 1; (n != NONODE) && num && numentries(n); num--) {
+        if (comparekeys(B)(getkey(n, slot),prev, 0) < 0) {
+            sprintf(B->message, "BOMB %8s\n", getkey(n, slot).name);
+            OutputDebugString(B->message);
+            DebugBreak();
+        }
+        prev = getkey(n, slot);
+        data = getdatavalue(getnode(n, slot));
+        sprintf(B->message, "%8s (%d.%d.%d)\n", 
+                prev.name, data.volume, data.vnode, data.unique);
+        OutputDebugString(B->message);
+        if (++slot > numentries(n))
+            n = getnextnode(n), slot = 1;
+    }   
+    sprintf(B->message, "\n\n");
+    OutputDebugString(B->message);
+}
+
+/********************   entire B+tree data printer   *******************/
+void 
+listAllBtreeValues(Tree *B)
+{
+    listBtreeValues(B, getleaf(B), BTERROR);
+}
+#endif /* DEBUG_BTREE */
+
+void 
+findAllBtreeValues(Tree *B)
+{
+    int num = -1;
+    Nptr n = getleaf(B), l;
+    int slot;
+    keyT prev = {""};
+
+    for (slot = 1; (n != NONODE) && num && numentries(n); num--) {
+        if (comparekeys(B)(getkey(n, slot),prev, 0) < 0) {
+            sprintf(B->message,"BOMB %8s\n", getkey(n, slot).name);
+            OutputDebugString(B->message);
+#ifdef DEBUG_BTREE
+            DebugBreak();
+#endif
+        }
+        prev = getkey(n, slot);
+        l = bplus_Lookup(B, prev);
+        if ( l != n ){
+            if (l == NONODE)
+                sprintf(B->message,"BOMB %8s cannot be found\n", prev.name);
+            else 
+                sprintf(B->message,"BOMB lookup(%8s) finds wrong node\n", prev.name);
+            OutputDebugString(B->message);
+#ifdef DEBUG_BTREE
+            DebugBreak();
+#endif
+        }
+
+        if (++slot > numentries(n))
+            n = getnextnode(n), slot = 1;
+    }   
+}
+
+static int
+compareKeys(keyT key1, keyT key2, int flags)
+{
+    if (flags & EXACT_MATCH)
+        return strcmp(key1.name, key2.name);
+    else
+        return stricmp(key1.name, key2.name);
+}
+
+/* Look up a file name in directory.
+
+   On entry:
+       op->scp->dirlock is read locked
+
+   On exit:
+       op->scp->dirlock is read locked
+*/
+int
+cm_BPlusDirLookup(cm_dirOp_t * op, char *entry, cm_fid_t * cfid)
+{
+    int rc = EINVAL;
+    keyT key = {entry};
+    Nptr leafNode = NONODE;
+    LARGE_INTEGER start, end;
+
+    if (op->scp->dirBplus == NULL || 
+        op->dataVersion != op->scp->dirDataVersion) {
+        rc = EINVAL;
+        goto done;
+    }
+
+    QueryPerformanceCounter(&start);
+
+    leafNode = bplus_Lookup(op->scp->dirBplus, key);
+    if (leafNode != NONODE) {
+        int         slot;
+        Nptr        firstDataNode, dataNode, nextDataNode;
+        int         exact = 0;
+        int         count = 0;
+
+        /* Found a leaf that matches the key via a case-insensitive
+         * match.  There may be one or more data nodes that match.
+         * If we have an exact match, return that.
+         * If we have an ambiguous match, return an error.
+         * If we have only one inexact match, return that.
+         */
+        slot = findKey(op->scp->dirBplus, leafNode, 1, numentries(leafNode));
+        firstDataNode = getnode(leafNode, slot);
+
+        for ( dataNode = firstDataNode; dataNode; dataNode = nextDataNode) {
+            count++;
+            if (!comparekeys(op->scp->dirBplus)(key, getdatakey(dataNode), EXACT_MATCH) ) {
+                exact = 1;
+                break;
+            }
+            nextDataNode = getdatanext(dataNode);
+        }
+
+        if (exact) {
+            *cfid = getdatavalue(dataNode);
+            rc = 0;
+            bplus_lookup_hits++;
+        } else if (count == 1) {
+            *cfid = getdatavalue(firstDataNode);
+            rc = CM_ERROR_INEXACT_MATCH;
+            bplus_lookup_hits_inexact++;
+        } else {
+                rc = CM_ERROR_AMBIGUOUS_FILENAME;
+            bplus_lookup_ambiguous++;
+        } 
+    } else {
+            rc = ENOENT;
+        bplus_lookup_misses++;
+    }
+
+    QueryPerformanceCounter(&end);
+
+    bplus_lookup_time += (end.QuadPart - start.QuadPart);
+
+  done:
+    return rc;
+}
+
+
+/* 
+   On entry:
+       op->scp->dirlock is write locked
+
+   On exit:
+       op->scp->dirlock is write locked
+*/
+long cm_BPlusDirCreateEntry(cm_dirOp_t * op, char *entry, cm_fid_t * cfid)
+{
+    long rc = 0;
+    keyT key = {entry};
+    LARGE_INTEGER start, end;
+
+    if (op->scp->dirBplus == NULL || 
+        op->dataVersion != op->scp->dirDataVersion) {
+        rc = EINVAL;
+        goto done;
+    }
+
+    QueryPerformanceCounter(&start);
+    bplus_create_entry++;
+
+    insert(op->scp->dirBplus, key, *cfid);
+
+    QueryPerformanceCounter(&end);
+
+    bplus_create_time += (end.QuadPart - start.QuadPart);
+
+  done:
+
+    return rc;
+}
+
+/* 
+   On entry:
+       op->scp->dirlock is write locked
+
+   On exit:
+       op->scp->dirlock is write locked
+*/
+int  cm_BPlusDirDeleteEntry(cm_dirOp_t * op, char *entry)
+{
+    long rc = 0;
+    keyT key = {entry};
+    LARGE_INTEGER start, end;
+
+    if (op->scp->dirBplus == NULL || 
+        op->dataVersion != op->scp->dirDataVersion) {
+        rc = EINVAL;
+        goto done;
+    }
+
+    QueryPerformanceCounter(&start);
+
+    bplus_remove_entry++;
+
+    if (op->scp->dirBplus) {
+        delete(op->scp->dirBplus, key);
+    }
+    
+    QueryPerformanceCounter(&end);
+
+    bplus_remove_time += (end.QuadPart - start.QuadPart);
+
+  done:
+    return rc;
+      
+}
+
+static 
+int cm_BPlusDirFoo(struct cm_scache *scp, struct cm_dirEntry *dep,
+                   void *dummy, osi_hyper_t *entryOffsetp)
+{
+    keyT   key = {dep->name};
+    dataT  data = {scp->fid.cell, scp->fid.volume, ntohl(dep->fid.vnode), ntohl(dep->fid.unique)};
+
+    /* the Write lock is held in cm_BPlusDirBuildTree() */
+    insert(scp->dirBplus, key, data);
+
+    return 0;
+}
+
+
+/*
+ *   scp->dirlock must be writeLocked before call
+ *
+ *   scp->mutex must not be held
+ */
+long cm_BPlusDirBuildTree(cm_scache_t *scp, cm_user_t *userp, cm_req_t* reqp)
+{
+    long rc = 0;
+    osi_hyper_t thyper;
+    LARGE_INTEGER start, end;
+
+    osi_assert(scp->dirBplus == NULL);
+
+    QueryPerformanceCounter(&start);
+    bplus_build_tree++;
+
+    if (scp->dirBplus == NULL) {
+        scp->dirBplus = initBtree(64, MAX_FANOUT, compareKeys);
+    }
+    if (scp->dirBplus == NULL) {
+        rc = ENOMEM;
+    } else {
+        thyper.LowPart = 0;
+        thyper.HighPart = 0;
+        rc = cm_ApplyDir(scp, cm_BPlusDirFoo, NULL, &thyper, userp, reqp, NULL);
+    }
+
+    QueryPerformanceCounter(&end);
+
+    bplus_build_time += (end.QuadPart - start.QuadPart);
+
+    return rc;
+}
+
+void cm_BPlusDumpStats(void)
+{
+    afsi_log("B+ Lookup    Hits: %-8d", bplus_lookup_hits);
+    afsi_log("     Inexact Hits: %-8d", bplus_lookup_hits_inexact);
+    afsi_log("   Ambiguous Hits: %-8d", bplus_lookup_ambiguous);
+    afsi_log("           Misses: %-8d", bplus_lookup_misses);
+    afsi_log("           Create: %-8d", bplus_create_entry);
+    afsi_log("           Remove: %-8d", bplus_remove_entry);
+    afsi_log("       Build Tree: %-8d", bplus_build_tree);
+    afsi_log("        Free Tree: %-8d", bplus_free_tree);
+    afsi_log("         DV Error: %-8d", bplus_dv_error);
+
+    afsi_log("B+ Time    Lookup: %-16I64d", bplus_lookup_time);
+    afsi_log("           Create: %-16I64d", bplus_create_time);
+    afsi_log("           Remove: %-16I64d", bplus_remove_time);
+    afsi_log("            Build: %-16I64d", bplus_build_time);
+    afsi_log("             Free: %-16I64d", bplus_free_time);
+}
+#endif /* USE_BPLUS */
index d507d30..31fb31d 100644 (file)
@@ -1 +1,281 @@
-/* place holder for B+ tree header */
+/* 
+ * Copyright 2007 Secure Endpoints Inc.  
+ * 
+ * All Rights Reserved.
+ *
+ * This software has been released under the terms of the IBM Public
+ * License.  For details, see the LICENSE file in the top-level source
+ * directory or online at http://www.openafs.org/dl/license10.html
+ *
+ * Thanks to Jan Jannink for B+ tree algorithms.
+ */
+
+#ifndef BPLUSTREE_H
+#define BPLUSTREE_H 1
+
+/********      flag bits (5 of 16 used, 11 for magic value)    ********/
+
+/* bits set at node creation/split/merge */
+#define isLEAF         0x01
+#define isROOT         0x02
+#define isDATA          0x04
+
+/* bits set at key insertion/deletion */
+#define isFULL         0x08
+#define FEWEST         0x10
+#define FLAGS_MASK     0xFF
+
+/* identifies data as being a B+tree node */
+#define BTREE_MAGIC          0xBEE0BEE0
+#define BTREE_MAGIC_MASK      0xFFFFFFFF
+
+
+/*************************     constants       ************************/
+
+/* The maximum number of Entrys in one Node */
+#define MAX_FANOUT     9
+
+/* corresponds to a NULL node pointer value */
+#define NONODE          NULL
+
+/* special node slot values used in key search */
+#define BTERROR                -1
+#define BTUPPER                -2
+#define BTLOWER                -3
+
+
+/************************* Data Types **********************************/
+typedef struct node    *Nptr;
+
+typedef struct key {
+    char *name;
+} keyT;
+
+typedef cm_fid_t dataT;
+
+typedef struct entry {
+    keyT       key;
+    Nptr       downNode;
+} Entry;
+
+typedef struct inner {
+    short      pairs;
+    Nptr       firstNode;
+} Inner;
+
+
+typedef struct leaf {
+    short      pairs;
+    Nptr       nextNode;
+} Leaf;
+
+typedef struct data {
+    keyT        key;
+    dataT      value;
+    Nptr        next;
+} Data;
+
+typedef struct node {
+    Nptr                pool;
+    unsigned short      number;
+    unsigned short     flags;
+    unsigned long       magic;
+    union {
+        Entry  e[MAX_FANOUT];  /* allows access to entry array */
+        Inner  i;
+        Leaf   l;
+        Data   d;
+    } X;
+} Node;
+
+
+typedef int (*KeyCmp)(keyT, keyT, int);
+#define EXACT_MATCH 0x01
+
+
+typedef struct tree {
+    unsigned int        flags;          /* tree flags */
+    unsigned int       poolsize;       /* # of nodes allocated for tree */
+    Nptr               root;           /* pointer to root node */
+    Nptr               leaf;           /* pointer to first leaf node in B+tree */
+    unsigned int       fanout;         /* # of pointers to other nodes */
+    unsigned int       minfanout;      /* usually minfanout == ceil(fanout/2) */
+    unsigned int       height;         /* nodes traversed from root to leaves */
+    Nptr               pool;           /* list of all nodes */
+    Nptr                empty;          /* list of empty nodes */
+    keyT               theKey;         /*  the key value used in tree operations */
+    dataT              theData;        /*  data used for insertions/deletions */
+    union {                            /* nodes to change in insert and delete */
+        Nptr   split;
+        Nptr   merge;
+    } branch;
+    KeyCmp     keycmp;                 /* pointer to function comparing two keys */
+    char message[1024];
+} Tree;
+
+#define TREE_FLAGS_MASK                 0x01
+#define TREE_FLAG_UNIQUE_KEYS           0x01
+
+/************************* B+ tree Public functions ****************/
+Tree   *initBtree(unsigned int poolsz, unsigned int fan, KeyCmp keyCmp);
+void   freeBtree(Tree *B);
+
+#ifdef DEBUG_BTREE
+void   showNode(Tree *B, const char * str, Nptr node);
+void   showBtree(Tree *B);
+void   listBtreeValues(Tree *B, Nptr start, int count);
+void   listAllBtreeValues(Tree *B);
+void    listBtreeNodes(Tree *B, const char * where, Nptr node);
+void    findAllBtreeValues(Tree *B);
+#endif
+
+void   insert(Tree *B, keyT key, dataT data);
+void   delete(Tree *B, keyT key);
+Nptr   lookup(Tree *B, keyT key);
+
+/******************* cache manager directory operations ***************/
+int  cm_BPlusDirLookup(cm_dirOp_t * op, char *entry, cm_fid_t * cfid);
+long cm_BPlusDirCreateEntry(cm_dirOp_t * op, char *entry, cm_fid_t * cfid);
+int  cm_BPlusDirDeleteEntry(cm_dirOp_t * op, char *entry);
+long cm_BPlusDirBuildTree(cm_scache_t *scp, cm_user_t *userp, cm_req_t* reqp);
+void cm_BPlusDumpStats(void);
+
+extern afs_uint32 bplus_free_tree;
+extern afs_uint32 bplus_dv_error;
+extern afs_uint64 bplus_free_time;
+
+/************ Accessor Macros *****************************************/
+                       
+/* low level definition of Nptr value usage */
+#define nAdr(n) (n)->X
+
+/* access keys and pointers in a node */
+#define getkey(j, q) (nAdr(j).e[(q)].key)
+#define getnode(j, q) (nAdr(j).e[(q)].downNode)
+#define setkey(j, q, v) ((q > 0) ? nAdr(j).e[(q)].key.name = strdup((v).name) : NULL)
+#define setnode(j, q, v) (nAdr(j).e[(q)].downNode = (v))
+
+/* access tree flag values */
+#define settreeflags(B,v) (B->flags |= (v & TREE_FLAGS_MASK))
+#define gettreeflags(B)   (B->flags)
+#define cleartreeflags(B) (B->flags = 0);
+
+/* access node flag values */
+#define setflag(j, v) ((j)->flags |= (v & FLAGS_MASK))
+#define clrflag(j, v) ((j)->flags &= ~(v & FLAGS_MASK))
+#define getflags(j) ((j)->flags & FLAGS_MASK)
+#define getmagic(j)  ((j)->magic & BTREE_MAGIC_MASK)
+#define clearflags(j) ((j)->flags = 0, (j)->magic = BTREE_MAGIC)
+
+
+/* check that a node is in fact a node */
+#define isnode(j) (((j) != NONODE) && (((j)->magic & BTREE_MAGIC_MASK) == BTREE_MAGIC) && !((j)->flags & isDATA))
+#define isntnode(j) ((j) == NONODE)
+
+
+/* test individual flag values */
+#define isinternal(j) (((j)->flags & isLEAF) == 0)
+#define isleaf(j) (((j)->flags & isLEAF) == isLEAF)
+#define isdata(j) (((j)->flags & isDATA) == isDATA)
+#ifndef DEBUG_BTREE
+#define isroot(j) (((j)->flags & isROOT) == isROOT)
+#define isfull(j) (((j)->flags & isFULL) == isFULL)
+#define isfew(j) (((j)->flags & FEWEST) == FEWEST)
+#else
+#define isroot(j) _isRoot(B, j)
+#define isfew(j) _isFew(B, j)
+#define isfull(j) _isFull(B, j)
+#endif
+
+/* manage number of keys in a node */
+#define numentries(j) (nAdr(j).i.pairs)
+#define clearentries(j) (nAdr(j).i.pairs = 0)
+#define incentries(j) (nAdr(j).i.pairs++)
+#define decentries(j) (nAdr(j).i.pairs--)
+
+
+/* manage first/last node pointers in internal nodes */
+#define setfirstnode(j, v) (nAdr(j).i.firstNode = (v))
+#define getfirstnode(j) (nAdr(j).i.firstNode)
+#define getlastnode(j) (nAdr(j).e[nAdr(j).i.pairs].downNode)
+
+
+/* manage pointers to next nodes in leaf nodes */
+/* also used for free nodes list */
+#define setnextnode(j, v) (nAdr(j).l.nextNode = (v))
+#define getnextnode(j) (nAdr(j).l.nextNode)
+
+/* manage pointers to all nodes list */
+#define setallnode(j, v) ((j)->pool = (v))
+#define getallnode(j) ((j)->pool)
+
+/* manage access to data nodes */
+#define getdata(j) (nAdr(j).d)
+#define getdatavalue(j) (getdata(j).value)
+#define getdatakey(j)   (getdata(j).key)
+#define getdatanext(j)  (getdata(j).next)
+
+/* shift/transfer entries for insertion/deletion */
+#define clrentry(j, q) _clrentry(j,q)
+#define pushentry(j, q, v) _pushentry(j, q, v)
+#define pullentry(j, q, v) _pullentry(j, q, v)
+#define xferentry(j, q, v, z) _xferentry(j, q, v, z)
+#define setentry(j, q, v, z) _setentry(j, q, v, z)
+
+
+/* access key and data values for B+tree methods */
+/* pass values to getSlot(), descend...() */
+#define getfunkey(B) ((B)->theKey)
+#define getfundata(B) ((B)->theData)
+#define setfunkey(B,v) ((B)->theKey = (v))
+#define setfundata(B,v) ((B)->theData = (v))
+
+/* define number of B+tree nodes for free node pool */
+#define getpoolsize(B) ((B)->poolsize)
+#define setpoolsize(B,v) ((B)->poolsize = (v))
+
+/* locations from which tree access begins */
+#define getroot(B) ((B)->root)
+#define setroot(B,v) ((B)->root = (v))
+#define getleaf(B) ((B)->leaf)
+#define setleaf(B,v) ((B)->leaf = (v))
+
+/* define max/min number of pointers per node */
+#define getfanout(B) ((B)->fanout)
+#define setfanout(B,v) ((B)->fanout = (v) - 1)
+#define getminfanout(B,j) (isroot(j) ? (isleaf(j) ? 0 : 1) : (isleaf(j) ? (B)->fanout - (B)->minfanout: (B)->minfanout))
+#define setminfanout(B,v) ((B)->minfanout = (v) - 1)
+
+/* manage B+tree height */
+#define inittreeheight(B) ((B)->height = 0)
+#define inctreeheight(B) ((B)->height++)
+#define dectreeheight(B) ((B)->height--)
+#define gettreeheight(B) ((B)->height)
+
+/* access pool of free nodes */
+#define getfirstfreenode(B) ((B)->empty)
+#define setfirstfreenode(B,v) ((B)->empty = (v))
+
+/* access all node list */
+#define getfirstallnode(B) ((B)->pool)
+#define setfirstallnode(B,v) ((B)->pool = (v))
+
+/* handle split/merge points during insert/delete */
+#define getsplitpath(B) ((B)->branch.split)
+#define setsplitpath(B,v) ((B)->branch.split = (v))
+#define getmergepath(B) ((B)->branch.merge)
+#define setmergepath(B,v) ((B)->branch.merge = (v))
+
+/* exploit function to compare two B+tree keys */
+#define comparekeys(B) (*(B)->keycmp)
+#define setcomparekeys(B,v) ((B)->keycmp = (v))
+
+/* location containing B+tree class variables */
+#define setbplustree(B,v) ((B) = (Tree *)(v))
+
+/* representation independent node numbering */
+#define setnodenumber(B,v,q) ((v)->number = (q))
+#define getnodenumber(B,v) ((v) != NONODE ? (v)->number : -1)
+
+#endif /* BPLUSTREE_H */
+
index 758af9a..897e3fc 100644 (file)
 #include <malloc.h>
 #include <osi.h>
 #include "afsd.h"
+#ifdef USE_BPLUS
+#include "cm_btree.h"
+#endif
 #include <rx/rx.h>
 
 
 afs_int32 DErrno;
 
+afs_uint32 dir_lookup_hits = 0;
+afs_uint32 dir_lookup_misses = 0;
+afs_uint32 dir_create_entry = 0;
+afs_uint32 dir_remove_entry = 0;
+
+afs_uint64 dir_lookup_time = 0;
+afs_uint64 dir_create_time = 0;
+afs_uint64 dir_remove_time = 0;
+
+afs_int32  cm_BPlusTrees = 1;
+
+void cm_DirDumpStats(void)
+{
+    afsi_log("Dir Lookup   Hits: %-8d", dir_lookup_hits);
+    afsi_log("           Misses: %-8d", dir_lookup_misses);
+    afsi_log("           Create: %-8d", dir_create_entry);
+    afsi_log("           Remove: %-8d", dir_remove_entry);
+
+    afsi_log("Dir Times  Lookup: %-16I64d", dir_lookup_time);
+    afsi_log("           Create: %-16I64d", dir_create_time);
+    afsi_log("           Remove: %-16I64d", dir_remove_time);
+}
+
+
 /* Local static prototypes */
 static long
 cm_DirGetBlob(cm_dirOp_t * op,
@@ -78,10 +105,10 @@ cm_NameEntries(char *namep, long *lenp)
    entry is a string name.
 
    On entry:
-       op->scp->mx is locked
+       op->scp->mx is unlocked
 
    On exit:
-       op->scp->mx is locked
+       op->scp->mx is unlocked
 
    None of the directory buffers for op->scp should be locked by the
    calling thread.
@@ -91,6 +118,7 @@ cm_DirCreateEntry(cm_dirOp_t * op, char *entry, cm_fid_t * cfid)
 {
     int blobs, firstelt;
     int i;
+    LARGE_INTEGER start, end;
 
     cm_dirEntry_t *ep = NULL;
     cm_buf_t *entrybuf = NULL;
@@ -107,6 +135,10 @@ cm_DirCreateEntry(cm_dirOp_t * op, char *entry, cm_fid_t * cfid)
     if (*entry == 0)
        return EINVAL;
 
+    QueryPerformanceCounter(&start);
+
+    dir_create_entry++;
+
     osi_Log4(afsd_logp, "cm_DirCreateEntry for op 0x%p, name [%s] and fid[%d,%d]",
              op, osi_LogSaveString(afsd_logp, entry), cfid->vnode, cfid->unique);
 
@@ -118,20 +150,24 @@ cm_DirCreateEntry(cm_dirOp_t * op, char *entry, cm_fid_t * cfid)
     if (code == 0) {
         cm_DirReleasePage(op, &entrybuf, FALSE);
         cm_DirReleasePage(op, &prevptrbuf, FALSE);
-       return EEXIST;
+       code = EEXIST;
+        goto done;
     }
 
     blobs = cm_NameEntries(entry, NULL);       /* number of entries required */
     firstelt = cm_DirFindBlobs(op, blobs);
     if (firstelt < 0) {
         osi_Log0(afsd_logp, "cm_DirCreateEntry returning EFBIG");
-       return EFBIG;           /* directory is full */
+       code = EFBIG;           /* directory is full */
+        goto done;
     }
 
     /* First, we fill in the directory entry. */
     code = cm_DirGetBlob(op, firstelt, &entrybuf, &ep);
-    if (code != 0)
-       return EIO;
+    if (code != 0) {
+       code = EIO;
+        goto done;
+    }
 
     ep->flag = CM_DIR_FFIRST;
     ep->fid.vnode = htonl(cfid->vnode);
@@ -142,7 +178,8 @@ cm_DirCreateEntry(cm_dirOp_t * op, char *entry, cm_fid_t * cfid)
     code = cm_DirGetPage(op, 0, &dhpbuf, &dhp);
     if (code != 0) {
        cm_DirReleasePage(op, &entrybuf, TRUE);
-       return EIO;
+       code = EIO;
+        goto done;
     }
 
     i = cm_DirHash(entry);
@@ -155,7 +192,12 @@ cm_DirCreateEntry(cm_dirOp_t * op, char *entry, cm_fid_t * cfid)
 
     osi_Log0(afsd_logp, "cm_DirCreateEntry returning success");
 
-    return 0;
+    code = 0;
+  done:
+    QueryPerformanceCounter(&end);
+
+    dir_create_time += (end.QuadPart - start.QuadPart);
+    return code;
 }
 
 /* Return the length of a directory in pages
@@ -198,10 +240,10 @@ cm_DirLength(cm_dirOp_t * op)
 /* Delete a directory entry.
 
    On entry:
-       op->scp->mx is locked
+       op->scp->mx is unlocked
 
    On exit:
-       op->scp->mx is locked
+       op->scp->mx is unlocked
 
    None of the directory buffers for op->scp should be locked by the
    calling thread.
@@ -219,8 +261,10 @@ cm_DirDeleteEntry(cm_dirOp_t * op, char *entry)
     cm_buf_t      *pibuf = NULL;
     osi_hyper_t    thyper;
     unsigned long  junk;
-
     long code;
+    LARGE_INTEGER start, end;
+
+    QueryPerformanceCounter(&start);
 
     osi_Log2(afsd_logp, "cm_DirDeleteEntry for op 0x%p, entry [%s]",
              op, osi_LogSaveString(afsd_logp, entry));
@@ -230,9 +274,12 @@ cm_DirDeleteEntry(cm_dirOp_t * op, char *entry)
                           &pibuf, &previtem);
     if (code != 0) {
         osi_Log0(afsd_logp, "cm_DirDeleteEntry returning ENOENT");
-       return ENOENT;
+       code = ENOENT;
+        goto done;
     }
 
+    dir_remove_entry++;
+
     *previtem = firstitem->next;
     cm_DirReleasePage(op, &pibuf, TRUE);
 
@@ -250,8 +297,14 @@ cm_DirDeleteEntry(cm_dirOp_t * op, char *entry)
     cm_DirFreeBlobs(op, index, nitems);
 
     osi_Log0(afsd_logp, "cm_DirDeleteEntry returning success");
+    code = 0;
 
-    return 0;
+  done:
+    QueryPerformanceCounter(&end);
+
+    dir_remove_time += (end.QuadPart - start.QuadPart);
+
+    return code;
 }
 
 /* Find a bunch of contiguous entries; at least nblobs in a row.
@@ -444,7 +497,7 @@ cm_DirFreeBlobs(cm_dirOp_t * op, int firstblob, int nblobs)
  * directory header page are allocated, 1 to the page header, 4 to the
  * allocation map and 8 to the hash table.
  *
- * Called with op->scp->mx
+ * Called with op->scp->mx unlocked
  */
 int
 cm_DirMakeDir(cm_dirOp_t * op, cm_fid_t * me, cm_fid_t * parent)
@@ -452,7 +505,7 @@ cm_DirMakeDir(cm_dirOp_t * op, cm_fid_t * me, cm_fid_t * parent)
     int i;
     cm_dirHeader_t *dhp = NULL;
     cm_buf_t *dhpbuf = NULL;
-
+    int rc = 0;
     long code;
 
     osi_Log3(afsd_logp, "cm_DirMakeDir for op 0x%p, directory fid[%d, %d]",
@@ -461,8 +514,10 @@ cm_DirMakeDir(cm_dirOp_t * op, cm_fid_t * me, cm_fid_t * parent)
              parent->vnode, parent->unique);
 
     code = cm_DirGetPage(op, 0, &dhpbuf, &dhp);
-    if (code)
-        return 1;
+    if (code) {
+        rc = 1;
+        goto done;
+    }
 
     dhp->header.pgcount = htons(1);
     dhp->header.tag = htons(1234);
@@ -484,16 +539,17 @@ cm_DirMakeDir(cm_dirOp_t * op, cm_fid_t * me, cm_fid_t * parent)
 
     osi_Log0(afsd_logp, "cm_DirMakeDir returning success");
 
-    return 0;
+  done:
+    return rc;
 }
 
 /* Look up a file name in directory.
 
    On entry:
-       op->scp->mx is locked
+       op->scp->mx is unlocked
 
    On exit:
-       op->scp->mx is locked
+       op->scp->mx is unlocked
 
    None of the directory buffers for op->scp should be locked by the
    calling thread.
@@ -505,8 +561,11 @@ cm_DirLookup(cm_dirOp_t * op, char *entry, cm_fid_t * cfid)
     cm_buf_t      *itembuf = NULL;
     unsigned short *previtem = NULL;
     cm_buf_t      *pibuf = NULL;
-
     long code;
+    LARGE_INTEGER       start;
+    LARGE_INTEGER       end;
+
+    QueryPerformanceCounter(&start);
 
     osi_Log2(afsd_logp, "cm_DirLookup for op 0x%p, entry[%s]",
              op, osi_LogSaveString(afsd_logp, entry));
@@ -515,7 +574,9 @@ cm_DirLookup(cm_dirOp_t * op, char *entry, cm_fid_t * cfid)
                           &itembuf, &firstitem,
                           &pibuf, &previtem);
     if (code != 0) {
-        return ENOENT;
+        dir_lookup_misses++;
+        code = ENOENT;
+        goto done;
     }
 
     cm_DirReleasePage(op, &pibuf, FALSE);
@@ -530,7 +591,15 @@ cm_DirLookup(cm_dirOp_t * op, char *entry, cm_fid_t * cfid)
     osi_Log2(afsd_logp, "cm_DirLookup returning fid[%d,%d]",
              cfid->vnode, cfid->unique);
 
-    return 0;
+    dir_lookup_hits++;
+    code = 0;
+
+  done:
+    QueryPerformanceCounter(&end);
+
+    dir_lookup_time += (end.QuadPart - start.QuadPart);
+
+    return code;
 }
 
 /* Look up a file name in directory.
@@ -863,10 +932,10 @@ cm_DirFindItem(cm_dirOp_t * op,
 */
 long
 cm_BeginDirOp(cm_scache_t * scp, cm_user_t * userp, cm_req_t * reqp,
-              cm_dirOp_t * op)
+              afs_uint32 lockType, cm_dirOp_t * op)
 {
     long code;
-    int i;
+    int i, mxheld = 0;
 
     osi_Log3(afsd_logp, "Beginning dirOp[0x%p] for scp[0x%p], userp[0x%p]",
              op, scp, userp);
@@ -893,7 +962,73 @@ cm_BeginDirOp(cm_scache_t * scp, cm_user_t * userp, cm_req_t * reqp,
         op->newLength = op->length;
         op->dataVersion = scp->dataVersion;
         op->newDataVersion = op->dataVersion;
+
+#ifdef USE_BPLUS
+        lock_ObtainRead(&scp->dirlock);
+        if (!cm_BPlusTrees ||
+            (scp->dirBplus &&
+             scp->dirDataVersion == scp->dataVersion)) 
+        {
+            int mxheld = 0;
+
+            switch (lockType) {
+            case CM_DIRLOCK_NONE:
+                lock_ReleaseRead(&scp->dirlock);
+                break;
+            case CM_DIRLOCK_READ:
+                /* got it already */
+                break;
+            case CM_DIRLOCK_WRITE:
+            default:
+                lock_ReleaseRead(&scp->dirlock);
+                lock_ObtainWrite(&scp->dirlock);
+            }
+        } else {
+            lock_ReleaseRead(&scp->dirlock);
+            lock_ObtainWrite(&scp->dirlock);
+            if (scp->dirBplus && 
+                scp->dirDataVersion != scp->dataVersion)
+            {
+                bplus_dv_error++;
+                bplus_free_tree++;
+                freeBtree(scp->dirBplus);
+                scp->dirBplus = NULL;
+                scp->dirDataVersion = -1;
+            }
+
+            if (!scp->dirBplus) {
+                cm_BPlusDirBuildTree(scp, userp, reqp);
+                if (scp->dirBplus)
+                    scp->dirDataVersion = scp->dataVersion;
+            }
+
+            switch (lockType) {
+            case CM_DIRLOCK_NONE:
+                lock_ReleaseWrite(&scp->dirlock);
+                break;
+            case CM_DIRLOCK_READ:
+                lock_ConvertWToR(&scp->dirlock);
+                break;
+            case CM_DIRLOCK_WRITE:
+            default:
+                /* got it already */;
+            }
+        }
+#else
+        switch (lockType) {
+        case CM_DIRLOCK_NONE:
+            break;
+        case CM_DIRLOCK_READ:
+            lock_ObtainRead(&scp->dirlock);
+            break;
+        case CM_DIRLOCK_WRITE:
+        default:
+            lock_ObtainWrite(&scp->dirlock);
+        }
+#endif
+        op->lockType = lockType;
     } else {
+    
         cm_EndDirOp(op);
     }
 
@@ -901,7 +1036,7 @@ cm_BeginDirOp(cm_scache_t * scp, cm_user_t * userp, cm_req_t * reqp,
 }
 
 /* Check if it is safe for us to perform local directory updates.
-   Called with scp->mx held. */
+   Called with scp->mx unlocked. */
 int
 cm_CheckDirOpForSingleChange(cm_dirOp_t * op)
 {
@@ -932,7 +1067,7 @@ cm_CheckDirOpForSingleChange(cm_dirOp_t * op)
 }
 
 /* End a sequence of directory operations.  Called with op->scp->mx
-   locked.*/
+   unlocked.*/
 long
 cm_EndDirOp(cm_dirOp_t * op)
 {
@@ -947,10 +1082,39 @@ cm_EndDirOp(cm_dirOp_t * op)
     if (op->dirtyBufCount > 0) {
         /* we made changes.  We should go through the list of buffers
            and update the dataVersion for each. */
-
-        lock_ReleaseMutex(&op->scp->mx);
         code = buf_ForceDataVersion(op->scp, op->dataVersion, op->newDataVersion);
-        lock_ObtainMutex(&op->scp->mx);
+
+#ifdef USE_BPLUS
+        /* and update the data version on the B+ tree */
+        if (op->scp->dirBplus && 
+            op->scp->dirDataVersion == op->dataVersion) {
+
+            switch (op->lockType) {
+            case CM_DIRLOCK_READ:
+                lock_ReleaseRead(&op->scp->dirlock);
+                /* fall through ... */
+            case CM_DIRLOCK_NONE:
+                lock_ObtainWrite(&op->scp->dirlock);
+                op->lockType = CM_DIRLOCK_WRITE;
+                break;
+            case CM_DIRLOCK_WRITE:
+            default:
+                /* already got it */;
+            }
+            op->scp->dirDataVersion = op->newDataVersion;
+        }
+#endif
+    }
+
+    switch (op->lockType) {
+    case CM_DIRLOCK_NONE:
+        break;
+    case CM_DIRLOCK_READ:
+        lock_ReleaseRead(&op->scp->dirlock);
+        break;
+    case CM_DIRLOCK_WRITE:
+    default:
+        lock_ReleaseWrite(&op->scp->dirlock);
     }
 
     if (op->scp)
@@ -1077,7 +1241,7 @@ cm_DirOpFindBuffer(cm_dirOp_t * op, osi_hyper_t offset, cm_buf_t ** bufferpp)
 }
 
 
-/* NOTE: called with scp->mx NOT held */
+/* NOTE: called with scp->mx held or not depending on the flags */
 static int
 cm_DirOpDelBuffer(cm_dirOp_t * op, cm_buf_t * bufferp, int flags)
 {
@@ -1176,10 +1340,10 @@ cm_DirOpDelBuffer(cm_dirOp_t * op, cm_buf_t * bufferp, int flags)
    This should be called before cm_DirGetPage() is called per scp.
 
    On entry:
-     scp->mx locked
+     scp->mx unlocked
 
    On exit:
-     scp->mx locked
+     scp->mx unlocked
 
    During:
      scp->mx may be released
@@ -1189,8 +1353,10 @@ cm_DirCheckStatus(cm_dirOp_t * op)
 {
     long code;
 
+    lock_ObtainMutex(&op->scp->mx);
     code = cm_SyncOp(op->scp, NULL, op->userp, &op->req, PRSFS_LOOKUP,
                      CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
+    lock_ReleaseMutex(&op->scp->mx);
 
     osi_Log2(afsd_logp, "cm_DirCheckStatus for op 0x%p returning code 0x%x",
              op, code);
@@ -1202,7 +1368,7 @@ cm_DirCheckStatus(cm_dirOp_t * op)
    cm_DirGetPage() or any other function that returns a locked, held,
    directory page buffer.
 
-   Called with scp->mx held
+   Called with scp->mx unlocked
  */
 static long
 cm_DirReleasePage(cm_dirOp_t * op, cm_buf_t ** bufferpp, int modified)
@@ -1213,7 +1379,7 @@ cm_DirReleasePage(cm_dirOp_t * op, cm_buf_t ** bufferpp, int modified)
         return EINVAL;
 
     cm_DirOpDelBuffer(op, *bufferpp,
-                      ((modified ? DIROP_MODIFIED : 0) | DIROP_SCPLOCKED));
+                      ((modified ? DIROP_MODIFIED : 0)));
     buf_Release(*bufferpp);
     *bufferpp = NULL;
 
@@ -1242,15 +1408,15 @@ cm_DirReleasePage(cm_dirOp_t * op, cm_buf_t ** bufferpp, int modified)
    should be released via cm_DirReleasePage().
 
    On entry:
-     scp->mx locked.
+     scp->mx unlocked.
      If *bufferpp is non-NULL, then *bufferpp->mx is locked.
 
    On exit:
-     scp->mxlocked
+     scp->mx unlocked
      If *bufferpp is non-NULL, then *bufferpp->mx is locked.
 
    During:
-     scp->mx will be released
+     scp->mx will be obtained and released
 
  */
 static long
@@ -1282,8 +1448,6 @@ cm_DirGetPage(cm_dirOp_t * op,
         thyper = bufferp->offset;
     }
 
-    lock_ReleaseMutex(&op->scp->mx);
-
     if (!bufferp || !LargeIntegerEqualTo(thyper, bufferOffset)) {
         /* wrong buffer */
 
@@ -1393,8 +1557,6 @@ cm_DirGetPage(cm_dirOp_t * op,
  _exit:
 
     *bufferpp = bufferp;
-    if (op->scp)
-        lock_ObtainMutex(&op->scp->mx);
 
     osi_Log1(afsd_logp, "cm_DirGetPage returning code 0x%x", code);
 
index 3a720ef..4af06f8 100644 (file)
@@ -101,9 +101,14 @@ typedef struct cm_dirOpBuffer {
 
 #define CM_DIROPBUFF_INUSE      0x1
 
+#define CM_DIRLOCK_NONE         0x0
+#define CM_DIRLOCK_READ         0x1
+#define CM_DIRLOCK_WRITE        0x2
+
 /* Used for managing transactional directory operations.  Each
    instance should only be used by one thread. */
 typedef struct cm_dirOp {
+    int           lockType;
     cm_scache_t * scp;
     cm_user_t *   userp;
     cm_req_t      req;
@@ -119,13 +124,13 @@ typedef struct cm_dirOp {
 
     afs_uint32    dirtyBufCount;
 
-    int           nBuffers;     /* number of buffers below */
+    afs_uint32    nBuffers;     /* number of buffers below */
     cm_dirOpBuffer_t buffers[CM_DIROP_MAXBUFFERS];
 } cm_dirOp_t;
 
 extern long
 cm_BeginDirOp(cm_scache_t * scp, cm_user_t * userp, cm_req_t * reqp,
-              cm_dirOp_t * op);
+              afs_uint32 lockType, cm_dirOp_t * op);
 
 extern int
 cm_CheckDirOpForSingleChange(cm_dirOp_t * op);
@@ -175,4 +180,6 @@ cm_DirEntryListAdd(char * namep, cm_dirEntryList_t ** list);
 extern void
 cm_DirEntryListFree(cm_dirEntryList_t ** list);
 
+extern void
+cm_DirDumpStats(void);
 #endif /*  __CM_DIR_ENV__ */
index 20c6d8e..ef814cd 100644 (file)
@@ -19,6 +19,7 @@
 #include <osi.h>
 
 #include "afsd.h"
+#include "cm_btree.h"
 
 /*extern void afsi_log(char *pattern, ...);*/
 
@@ -205,6 +206,20 @@ long cm_RecycleSCache(cm_scache_t *scp, afs_int32 flags)
      * while we hold the global refcount lock.
      */
     cm_FreeAllACLEnts(scp);
+
+#ifdef USE_BPLUS
+    /* destroy directory Bplus Tree */
+    if (scp->dirBplus) {
+        LARGE_INTEGER start, end;
+        QueryPerformanceCounter(&start);
+        bplus_free_tree++;
+        freeBtree(scp->dirBplus);
+        scp->dirBplus = NULL;
+        QueryPerformanceCounter(&end);
+
+        bplus_free_time += (end.QuadPart - start.QuadPart);
+    }
+#endif
     return 0;
 }
 
@@ -297,6 +312,9 @@ cm_scache_t *cm_GetNewSCache(void)
     scp->magic = CM_SCACHE_MAGIC;
     lock_InitializeMutex(&scp->mx, "cm_scache_t mutex");
     lock_InitializeRWLock(&scp->bufCreateLock, "cm_scache_t bufCreateLock");
+#ifdef USE_BPLUS
+    lock_InitializeRWLock(&scp->dirlock, "cm_scache_t dirlock");
+#endif
     scp->serverLock = -1;
 
     /* and put it in the LRU queue */
@@ -492,6 +510,13 @@ cm_ShutdownSCache(void)
         scp->cbExpires = 0;
         scp->flags &= ~CM_SCACHEFLAG_CALLBACK;
 
+#ifdef USE_BPLUS
+        if (scp->dirBplus)
+            freeBtree(scp->dirBplus);
+        scp->dirBplus = NULL;
+        scp->dirDataVersion = -1;
+        lock_FinalizeRWLock(&scp->dirlock);
+#endif
         lock_FinalizeMutex(&scp->mx);
         lock_FinalizeRWLock(&scp->bufCreateLock);
     }
@@ -521,7 +546,9 @@ void cm_InitSCache(int newFile, long maxSCaches)
                   scp = scp->allNextp ) {
                 lock_InitializeMutex(&scp->mx, "cm_scache_t mutex");
                 lock_InitializeRWLock(&scp->bufCreateLock, "cm_scache_t bufCreateLock");
-
+#ifdef USE_BPLUS
+                lock_InitializeRWLock(&scp->dirlock, "cm_scache_t dirlock");
+#endif
                 scp->cbServerp = NULL;
                 scp->cbExpires = 0;
                 scp->fileLocksH = NULL;
@@ -535,6 +562,10 @@ void cm_InitSCache(int newFile, long maxSCaches)
                 scp->openShares = 0;
                 scp->openExcls = 0;
                 scp->waitCount = 0;
+#ifdef USE_BPLUS
+                scp->dirBplus = NULL;
+                scp->dirDataVersion = -1;
+#endif
                 scp->flags &= ~CM_SCACHEFLAG_WAITING;
             }
         }
index 0d28fbf..34cda77 100644 (file)
@@ -189,6 +189,13 @@ typedef struct cm_scache {
     /* bulk stat progress */
     osi_hyper_t bulkStatProgress;      /* track bulk stats of large dirs */
 
+#ifdef USE_BPLUS
+    /* directory B+ tree */             /* only allocated if is directory */
+    osi_rwlock_t dirlock;               /* controls access to dirBplus */
+    afs_uint32   dirDataVersion;        /* data version represented by dirBplus */
+    struct tree *dirBplus;              /* dirBplus */
+#endif
+
     /* open state */
     afs_uint16 openReads;              /* open for reading */
     afs_uint16 openWrites;             /* open for writing */
index acf08b0..090e3e1 100644 (file)
@@ -13,6 +13,7 @@
 #include <errno.h>
 #include <windows.h>
 #include <winsock2.h>
+#include <afs/unified_afs.h>
 #ifndef EWOULDBLOCK
 #define EWOULDBLOCK             WSAEWOULDBLOCK
 #define EINPROGRESS             WSAEINPROGRESS
@@ -41,6 +42,9 @@
 #define ETOOMANYREFS            WSAETOOMANYREFS
 #define ETIMEDOUT               WSAETIMEDOUT
 #define ECONNREFUSED            WSAECONNREFUSED
+#ifdef ELOOP
+#undef ELOOP
+#endif
 #define ELOOP                   WSAELOOP
 #ifdef ENAMETOOLONG
 #undef ENAMETOOLONG
@@ -58,7 +62,6 @@
 #define ESTALE                  WSAESTALE
 #define EREMOTE                 WSAEREMOTE
 #endif /* EWOULDBLOCK */
-#include <afs/unified_afs.h>
 
 #include <string.h>
 #include <malloc.h>
index ffcc3ff..71e67e7 100644 (file)
@@ -21,6 +21,7 @@
 #include <osi.h>
 
 #include "afsd.h"
+#include "cm_btree.h"
 
 #ifdef DEBUG
 extern void afsi_log(char *pattern, ...);
@@ -588,15 +589,12 @@ long cm_ApplyDir(cm_scache_t *scp, cm_DirFuncp_t funcp, void *parmp,
     lock_ObtainMutex(&scp->mx);
     code = cm_SyncOp(scp, NULL, userp, reqp, PRSFS_LOOKUP,
                       CM_SCACHESYNC_NEEDCALLBACK | CM_SCACHESYNC_GETSTATUS);
-    if (code) {
-        lock_ReleaseMutex(&scp->mx);
+    lock_ReleaseMutex(&scp->mx);
+    if (code)
         return code;
-    }
         
-    if (scp->fileType != CM_SCACHETYPE_DIRECTORY) {
-        lock_ReleaseMutex(&scp->mx);
+    if (scp->fileType != CM_SCACHETYPE_DIRECTORY)
         return CM_ERROR_NOTDIR;
-    }   
 
     if (retscp)                        /* if this is a lookup call */
     {
@@ -613,28 +611,39 @@ long cm_ApplyDir(cm_scache_t *scp, cm_DirFuncp_t funcp, void *parmp,
 #else /* !AFS_FREELANCE_CLIENT */
             TRUE
 #endif
-            ) {
-        int casefold = sp->caseFold;
-        sp->caseFold = 0; /* we have a strong preference for exact matches */
-        if ( *retscp = cm_dnlcLookup(scp, sp)) /* dnlc hit */
+            ) 
         {
+            int casefold = sp->caseFold;
+            sp->caseFold = 0; /* we have a strong preference for exact matches */
+            if ( *retscp = cm_dnlcLookup(scp, sp))     /* dnlc hit */
+            {
+                sp->caseFold = casefold;
+                return 0;
+            }
             sp->caseFold = casefold;
-            lock_ReleaseMutex(&scp->mx);
-            return 0;
-        }
-        sp->caseFold = casefold;
 
             /* see if we can find it using the directory hash tables.
                we can only do exact matches, since the hash is case
                sensitive. */
             {
                 cm_dirOp_t dirop;
+#ifdef USE_BPLUS
+                int usedBplus = 0;
+#endif
 
                 code = ENOENT;
 
-                code = cm_BeginDirOp(scp, userp, reqp, &dirop);
+                code = cm_BeginDirOp(scp, userp, reqp, CM_DIRLOCK_READ, &dirop);
                 if (code == 0) {
-                    code = cm_DirLookup(&dirop, sp->searchNamep, &sp->fid);
+
+#ifdef USE_BPLUS
+                    code = cm_BPlusDirLookup(&dirop, sp->searchNamep, &sp->fid);
+                    if (code != EINVAL)
+                        usedBplus = 1;
+                    else 
+#endif
+                        code = cm_DirLookup(&dirop, sp->searchNamep, &sp->fid);
+
                     cm_EndDirOp(&dirop);
                 }
 
@@ -642,14 +651,24 @@ long cm_ApplyDir(cm_scache_t *scp, cm_DirFuncp_t funcp, void *parmp,
                     /* found it */
                     sp->found = TRUE;
                     sp->ExactFound = TRUE;
-                    lock_ReleaseMutex(&scp->mx);
-
                     *retscp = NULL; /* force caller to call cm_GetSCache() */
-
                     return 0;
                 }
+#ifdef USE_BPLUS
+                if (usedBplus) {
+                    if (sp->caseFold && code == CM_ERROR_INEXACT_MATCH) {
+                        /* found it */
+                        sp->found = TRUE;
+                        sp->ExactFound = FALSE;
+                        *retscp = NULL; /* force caller to call cm_GetSCache() */
+                        return 0;
+                    }
+                    
+                    return CM_ERROR_NOSUCHFILE;
+                }
+#endif 
             }
-    }
+        }
     }  
 
     /*
@@ -658,8 +677,6 @@ long cm_ApplyDir(cm_scache_t *scp, cm_DirFuncp_t funcp, void *parmp,
      */
     dirLength = scp->length;
 
-    lock_ReleaseMutex(&scp->mx);
-
     bufferp = NULL;
     bufferOffset.LowPart = bufferOffset.HighPart = 0;
     if (startOffsetp)
@@ -1174,22 +1191,40 @@ long cm_LookupInternal(cm_scache_t *dscp, char *namep, long flags, cm_user_t *us
            not. */
 
         cm_dirOp_t dirop;
+#ifdef USE_BPLUS
+        int usedBplus = 0;
+#endif
 
-        lock_ObtainMutex(&dscp->mx);
-
-        code = cm_BeginDirOp(dscp, userp, reqp, &dirop);
+        code = cm_BeginDirOp(dscp, userp, reqp, CM_DIRLOCK_READ, &dirop);
         if (code == 0) {
-            code = cm_DirLookup(&dirop, namep, &rock.fid);
+#ifdef USE_BPLUS
+            code = cm_BPlusDirLookup(&dirop, namep, &rock.fid);
+            if (code != EINVAL)
+                usedBplus = 1;
+            else
+#endif
+                code = cm_DirLookup(&dirop, namep, &rock.fid);
+
             cm_EndDirOp(&dirop);
         }
 
-        lock_ReleaseMutex(&dscp->mx);
-
         if (code == 0) {
             /* found it */
             rock.found = TRUE;
             goto haveFid;
         }
+#ifdef USE_BPLUS
+        if (usedBplus) {
+            if (code == CM_ERROR_INEXACT_MATCH && (flags & CM_FLAG_CASEFOLD)) {
+                /* found it */
+                code = 0;
+                rock.found = TRUE;
+                goto haveFid;
+            }
+            
+            return CM_ERROR_NOSUCHFILE;
+        }
+#endif
     }
 
     rock.fid.cell = dscp->fid.cell;
@@ -1549,10 +1584,9 @@ long cm_Unlink(cm_scache_t *dscp, char *namep, cm_user_t *userp, cm_req_t *reqp)
 #endif  
 
     /* make sure we don't screw up the dir status during the merge */
-    lock_ObtainMutex(&dscp->mx);
-
-    code = cm_BeginDirOp(dscp, userp, reqp, &dirop);
+    code = cm_BeginDirOp(dscp, userp, reqp, CM_DIRLOCK_NONE, &dirop);
 
+    lock_ObtainMutex(&dscp->mx);
     sflags = CM_SCACHESYNC_STOREDATA;
     code = cm_SyncOp(dscp, NULL, userp, reqp, 0, sflags);
     lock_ReleaseMutex(&dscp->mx);
@@ -1585,14 +1619,13 @@ long cm_Unlink(cm_scache_t *dscp, char *namep, cm_user_t *userp, cm_req_t *reqp)
     else
         osi_Log0(afsd_logp, "CALL RemoveFile SUCCESS");
 
+    lock_ObtainWrite(&dscp->dirlock);
+    dirop.lockType = CM_DIRLOCK_WRITE;
     lock_ObtainMutex(&dscp->mx);
     cm_dnlcRemove(dscp, namep);
     cm_SyncOpDone(dscp, NULL, sflags);
     if (code == 0) {
         cm_MergeStatus(NULL, dscp, &newDirStatus, &volSync, userp, 0);
-        if (cm_CheckDirOpForSingleChange(&dirop)) {
-            cm_DirDeleteEntry(&dirop, namep);
-        }
     } else if (code == CM_ERROR_NOSUCHFILE) {
        /* windows would not have allowed the request to delete the file 
         * if it did not believe the file existed.  therefore, we must 
@@ -1600,9 +1633,16 @@ long cm_Unlink(cm_scache_t *dscp, char *namep, cm_user_t *userp, cm_req_t *reqp)
         */
        dscp->cbServerp = NULL;
     }
-    cm_EndDirOp(&dirop);
     lock_ReleaseMutex(&dscp->mx);
 
+    if (code == 0 && cm_CheckDirOpForSingleChange(&dirop)) {
+        cm_DirDeleteEntry(&dirop, namep);
+#ifdef USE_BPLUS
+        cm_BPlusDirDeleteEntry(&dirop, namep);
+#endif
+    }
+    cm_EndDirOp(&dirop);
+
     return code;
 }
 
@@ -2600,15 +2640,15 @@ long cm_Create(cm_scache_t *dscp, char *namep, long flags, cm_attr_t *attrp,
      * that someone who does a chmod will know to wait until our call
      * completes.
      */
+    cm_BeginDirOp(dscp, userp, reqp, CM_DIRLOCK_NONE, &dirop);
     lock_ObtainMutex(&dscp->mx);
-    cm_BeginDirOp(dscp, userp, reqp, &dirop);
     code = cm_SyncOp(dscp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA);
+    lock_ReleaseMutex(&dscp->mx);
     if (code == 0) {
         cm_StartCallbackGrantingCall(NULL, &cbReq);
     } else {
         cm_EndDirOp(&dirop);
     }
-    lock_ReleaseMutex(&dscp->mx);
     if (code) {
         return code;
     }
@@ -2643,6 +2683,8 @@ long cm_Create(cm_scache_t *dscp, char *namep, long flags, cm_attr_t *attrp,
     else
         osi_Log0(afsd_logp, "CALL CreateFile SUCCESS");
 
+    lock_ObtainWrite(&dscp->dirlock);
+    dirop.lockType = CM_DIRLOCK_WRITE;
     lock_ObtainMutex(&dscp->mx);
     cm_SyncOpDone(dscp, NULL, CM_SCACHESYNC_STOREDATA);
     if (code == 0) {
@@ -2680,12 +2722,13 @@ long cm_Create(cm_scache_t *dscp, char *namep, long flags, cm_attr_t *attrp,
     if (!didEnd)
         cm_EndCallbackGrantingCall(NULL, &cbReq, NULL, 0);
 
-    lock_ObtainMutex(&dscp->mx);
     if (scp && cm_CheckDirOpForSingleChange(&dirop)) {
         cm_DirCreateEntry(&dirop, namep, &newFid);
+#ifdef USE_BPLUS
+        cm_BPlusDirCreateEntry(&dirop, namep, &newFid);
+#endif
     }
     cm_EndDirOp(&dirop);
-    lock_ReleaseMutex(&dscp->mx);
 
     return code;
 }       
@@ -2745,15 +2788,15 @@ long cm_MakeDir(cm_scache_t *dscp, char *namep, long flags, cm_attr_t *attrp,
      * data, so that someone who does a chmod on the dir will wait until
      * our call completes.
      */
+    cm_BeginDirOp(dscp, userp, reqp, CM_DIRLOCK_NONE, &dirop);
     lock_ObtainMutex(&dscp->mx);
-    cm_BeginDirOp(dscp, userp, reqp, &dirop);
     code = cm_SyncOp(dscp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA);
+    lock_ReleaseMutex(&dscp->mx);
     if (code == 0) {
         cm_StartCallbackGrantingCall(NULL, &cbReq);
     } else {
         cm_EndDirOp(&dirop);
     }
-    lock_ReleaseMutex(&dscp->mx);
     if (code) {
         return code;
     }
@@ -2788,6 +2831,8 @@ long cm_MakeDir(cm_scache_t *dscp, char *namep, long flags, cm_attr_t *attrp,
     else
         osi_Log0(afsd_logp, "CALL MakeDir SUCCESS");
 
+    lock_ObtainWrite(&dscp->dirlock);
+    dirop.lockType = CM_DIRLOCK_WRITE;
     lock_ObtainMutex(&dscp->mx);
     cm_SyncOpDone(dscp, NULL, CM_SCACHESYNC_STOREDATA);
     if (code == 0) {
@@ -2824,12 +2869,13 @@ long cm_MakeDir(cm_scache_t *dscp, char *namep, long flags, cm_attr_t *attrp,
     if (!didEnd)
         cm_EndCallbackGrantingCall(NULL, &cbReq, NULL, 0);
 
-    lock_ObtainMutex(&dscp->mx);
     if (scp && cm_CheckDirOpForSingleChange(&dirop)) {
         cm_DirCreateEntry(&dirop, namep, &newFid);
+#ifdef USE_BPLUS
+        cm_BPlusDirCreateEntry(&dirop, namep, &newFid);
+#endif
     }
     cm_EndDirOp(&dirop);
-    lock_ReleaseMutex(&dscp->mx);
 
     /* and return error code */
     return code;
@@ -2853,12 +2899,12 @@ long cm_Link(cm_scache_t *dscp, char *namep, cm_scache_t *sscp, long flags,
         return CM_ERROR_CROSSDEVLINK;
     }
 
+    cm_BeginDirOp(dscp, userp, reqp, CM_DIRLOCK_NONE, &dirop);
     lock_ObtainMutex(&dscp->mx);
-    cm_BeginDirOp(dscp, userp, reqp, &dirop);
     code = cm_SyncOp(dscp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA);
+    lock_ReleaseMutex(&dscp->mx);
     if (code != 0)
         cm_EndDirOp(&dirop);
-    lock_ReleaseMutex(&dscp->mx);
 
     if (code)
         return code;
@@ -2893,12 +2939,17 @@ long cm_Link(cm_scache_t *dscp, char *namep, cm_scache_t *sscp, long flags,
     else
         osi_Log0(afsd_logp, "CALL Link SUCCESS");
 
+    lock_ObtainWrite(&dscp->dirlock);
+    dirop.lockType = CM_DIRLOCK_WRITE;
     lock_ObtainMutex(&dscp->mx);
     cm_SyncOpDone(dscp, NULL, CM_SCACHESYNC_STOREDATA);
     if (code == 0) {
         cm_MergeStatus(NULL, dscp, &updatedDirStatus, &volSync, userp, 0);
         if (cm_CheckDirOpForSingleChange(&dirop)) {
             cm_DirCreateEntry(&dirop, namep, &sscp->fid);
+#ifdef USE_BPLUS
+            cm_BPlusDirCreateEntry(&dirop, namep, &sscp->fid);
+#endif
         }
     }
     cm_EndDirOp(&dirop);
@@ -2927,12 +2978,12 @@ long cm_SymLink(cm_scache_t *dscp, char *namep, char *contentsp, long flags,
      * so that someone who does a chmod on the dir will wait until our
      * call completes.
      */
+    cm_BeginDirOp(dscp, userp, reqp, CM_DIRLOCK_NONE, &dirop);
     lock_ObtainMutex(&dscp->mx);
-    cm_BeginDirOp(dscp, userp, reqp, &dirop);
     code = cm_SyncOp(dscp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA);
+    lock_ReleaseMutex(&dscp->mx);
     if (code != 0)
         cm_EndDirOp(&dirop);
-    lock_ReleaseMutex(&dscp->mx);
     if (code) {
         return code;
     }
@@ -2965,6 +3016,8 @@ long cm_SymLink(cm_scache_t *dscp, char *namep, char *contentsp, long flags,
     else
         osi_Log0(afsd_logp, "CALL Symlink SUCCESS");
 
+    lock_ObtainWrite(&dscp->dirlock);
+    dirop.lockType = CM_DIRLOCK_WRITE;
     lock_ObtainMutex(&dscp->mx);
     cm_SyncOpDone(dscp, NULL, CM_SCACHESYNC_STOREDATA);
     if (code == 0) {
@@ -2976,6 +3029,9 @@ long cm_SymLink(cm_scache_t *dscp, char *namep, char *contentsp, long flags,
             newFid.unique = newAFSFid.Unique;
 
             cm_DirCreateEntry(&dirop, namep, &newFid);
+#ifdef USE_BPLUS
+            cm_BPlusDirCreateEntry(&dirop, namep, &newFid);
+#endif
         }
     }
     cm_EndDirOp(&dirop);
@@ -3017,21 +3073,20 @@ long cm_RemoveDir(cm_scache_t *dscp, char *namep, cm_user_t *userp,
     AFSFetchStatus updatedDirStatus;
     AFSVolSync volSync;
     struct rx_connection * callp;
-    cm_dirOp_t dirOp;
+    cm_dirOp_t dirop;
 
     /* before starting the RPC, mark that we're changing the directory data,
      * so that someone who does a chmod on the dir will wait until our
      * call completes.
      */
+    cm_BeginDirOp(dscp, userp, reqp, CM_DIRLOCK_NONE, &dirop);
     lock_ObtainMutex(&dscp->mx);
-    cm_BeginDirOp(dscp, userp, reqp, &dirOp);
     code = cm_SyncOp(dscp, NULL, userp, reqp, 0, CM_SCACHESYNC_STOREDATA);
+    lock_ReleaseMutex(&dscp->mx);
     if (code) {
-        cm_EndDirOp(&dirOp);
-        lock_ReleaseMutex(&dscp->mx);
+        cm_EndDirOp(&dirop);
         return code;
     }
-    lock_ReleaseMutex(&dscp->mx);
     didEnd = 0;
 
     /* try the RPC now */
@@ -3059,18 +3114,26 @@ long cm_RemoveDir(cm_scache_t *dscp, char *namep, cm_user_t *userp,
     else
         osi_Log0(afsd_logp, "CALL RemoveDir SUCCESS");
 
+    lock_ObtainWrite(&dscp->dirlock);
+    dirop.lockType = CM_DIRLOCK_WRITE;
     lock_ObtainMutex(&dscp->mx);
     cm_SyncOpDone(dscp, NULL, CM_SCACHESYNC_STOREDATA);
     if (code == 0) {
         cm_dnlcRemove(dscp, namep); 
         cm_MergeStatus(NULL, dscp, &updatedDirStatus, &volSync, userp, 0);
-        if (cm_CheckDirOpForSingleChange(&dirOp)) {
-            cm_DirDeleteEntry(&dirOp, namep);
-    }
     }
-    cm_EndDirOp(&dirOp);
     lock_ReleaseMutex(&dscp->mx);
 
+    if (code == 0) {
+        if (cm_CheckDirOpForSingleChange(&dirop)) {
+            cm_DirDeleteEntry(&dirop, namep);
+#ifdef USE_BPLUS
+            cm_BPlusDirDeleteEntry(&dirop, namep);
+#endif
+        }
+    }
+    cm_EndDirOp(&dirop);
+
     /* and return error code */
     return code;
 }
@@ -3122,16 +3185,16 @@ long cm_Rename(cm_scache_t *oldDscp, char *oldNamep, cm_scache_t *newDscp,
             return CM_ERROR_RENAME_IDENTICAL;
 
         oneDir = 1;
+        cm_BeginDirOp(oldDscp, userp, reqp, CM_DIRLOCK_NONE, &oldDirOp);
         lock_ObtainMutex(&oldDscp->mx);
         cm_dnlcRemove(oldDscp, oldNamep);
         cm_dnlcRemove(oldDscp, newNamep);
-        cm_BeginDirOp(oldDscp, userp, reqp, &oldDirOp);
         code = cm_SyncOp(oldDscp, NULL, userp, reqp, 0,
                           CM_SCACHESYNC_STOREDATA);
+        lock_ReleaseMutex(&oldDscp->mx);
         if (code != 0) {
             cm_EndDirOp(&oldDirOp);
         }
-        lock_ReleaseMutex(&oldDscp->mx);
     }
     else {
         /* two distinct dir vnodes */
@@ -3148,59 +3211,59 @@ long cm_Rename(cm_scache_t *oldDscp, char *oldNamep, cm_scache_t *newDscp,
             return CM_ERROR_CROSSDEVLINK;
 
         if (oldDscp->fid.vnode < newDscp->fid.vnode) {
+            cm_BeginDirOp(oldDscp, userp, reqp, CM_DIRLOCK_NONE, &oldDirOp);
             lock_ObtainMutex(&oldDscp->mx);
-            cm_BeginDirOp(oldDscp, userp, reqp, &oldDirOp);
             cm_dnlcRemove(oldDscp, oldNamep);
             code = cm_SyncOp(oldDscp, NULL, userp, reqp, 0,
                               CM_SCACHESYNC_STOREDATA);
+            lock_ReleaseMutex(&oldDscp->mx);
             if (code != 0)
                 cm_EndDirOp(&oldDirOp);
-            lock_ReleaseMutex(&oldDscp->mx);
             if (code == 0) {
+                cm_BeginDirOp(newDscp, userp, reqp, CM_DIRLOCK_NONE, &newDirOp);
                 lock_ObtainMutex(&newDscp->mx);
-                cm_BeginDirOp(newDscp, userp, reqp, &newDirOp);
                 cm_dnlcRemove(newDscp, newNamep);
                 code = cm_SyncOp(newDscp, NULL, userp, reqp, 0,
                                   CM_SCACHESYNC_STOREDATA);
-                if (code != 0)
-                    cm_EndDirOp(&newDirOp);
                 lock_ReleaseMutex(&newDscp->mx);
                 if (code) {
+                    cm_EndDirOp(&newDirOp);
+
                     /* cleanup first one */
                     lock_ObtainMutex(&oldDscp->mx);
                     cm_SyncOpDone(oldDscp, NULL,
                                    CM_SCACHESYNC_STOREDATA);
-                    cm_EndDirOp(&oldDirOp);
                     lock_ReleaseMutex(&oldDscp->mx);
+                    cm_EndDirOp(&oldDirOp);
                 }       
             }
         }
         else {
             /* lock the new vnode entry first */
+            cm_BeginDirOp(newDscp, userp, reqp, CM_DIRLOCK_NONE, &newDirOp);
             lock_ObtainMutex(&newDscp->mx);
-            cm_BeginDirOp(newDscp, userp, reqp, &newDirOp);
             cm_dnlcRemove(newDscp, newNamep);
             code = cm_SyncOp(newDscp, NULL, userp, reqp, 0,
                               CM_SCACHESYNC_STOREDATA);
+            lock_ReleaseMutex(&newDscp->mx);
             if (code != 0)
                 cm_EndDirOp(&newDirOp);
-            lock_ReleaseMutex(&newDscp->mx);
             if (code == 0) {
+                cm_BeginDirOp(oldDscp, userp, reqp, CM_DIRLOCK_NONE, &oldDirOp);
                 lock_ObtainMutex(&oldDscp->mx);
-                cm_BeginDirOp(oldDscp, userp, reqp, &oldDirOp);
                 cm_dnlcRemove(oldDscp, oldNamep);
                 code = cm_SyncOp(oldDscp, NULL, userp, reqp, 0,
                                   CM_SCACHESYNC_STOREDATA);
+                lock_ReleaseMutex(&oldDscp->mx);
                 if (code != 0)
                     cm_EndDirOp(&oldDirOp);
-                lock_ReleaseMutex(&oldDscp->mx);
                 if (code) {
                     /* cleanup first one */
                     lock_ObtainMutex(&newDscp->mx);
                     cm_SyncOpDone(newDscp, NULL,
                                    CM_SCACHESYNC_STOREDATA);
-                    cm_EndDirOp(&newDirOp);
                     lock_ReleaseMutex(&newDscp->mx);
+                    cm_EndDirOp(&newDirOp);
                 }       
             }
         }
@@ -3243,48 +3306,69 @@ long cm_Rename(cm_scache_t *oldDscp, char *oldNamep, cm_scache_t *newDscp,
         osi_Log0(afsd_logp, "CALL Rename SUCCESS");
 
     /* update the individual stat cache entries for the directories */
+    lock_ObtainWrite(&oldDscp->dirlock);
+    oldDirOp.lockType = CM_DIRLOCK_WRITE;
     lock_ObtainMutex(&oldDscp->mx);
     cm_SyncOpDone(oldDscp, NULL, CM_SCACHESYNC_STOREDATA);
 
-    if (code == 0) {
+    if (code == 0)
         cm_MergeStatus(NULL, oldDscp, &updatedOldDirStatus, &volSync,
                         userp, 0);
+    lock_ReleaseMutex(&oldDscp->mx);
 
+    if (code = 0) {
         if (cm_CheckDirOpForSingleChange(&oldDirOp)) {
 
-            diropCode = cm_DirLookup(&oldDirOp, oldNamep, &fileFid);
+#ifdef USE_BPLUS
+            diropCode = cm_BPlusDirLookup(&oldDirOp, oldNamep, &fileFid);
+            if (diropCode == CM_ERROR_INEXACT_MATCH)
+                diropCode = 0;
+            else if (diropCode == EINVAL)
+#endif
+                diropCode = cm_DirLookup(&oldDirOp, oldNamep, &fileFid);
 
             if (diropCode == 0) {
                 if (oneDir) {
                     diropCode = cm_DirCreateEntry(&oldDirOp, newNamep, &fileFid);
+#ifdef USE_BPLUS
+                    cm_BPlusDirCreateEntry(&oldDirOp, newNamep, &fileFid);
+#endif
                 }
 
-                if (diropCode == 0) {
+                if (diropCode == 0) { 
                     diropCode = cm_DirDeleteEntry(&oldDirOp, oldNamep);
-    }
+#ifdef USE_BPLUS
+                    cm_BPlusDirDeleteEntry(&oldDirOp, oldNamep);
+#endif
+                }
             }
         }
     }
     cm_EndDirOp(&oldDirOp);
-    lock_ReleaseMutex(&oldDscp->mx);
 
     /* and update it for the new one, too, if necessary */
     if (!oneDir) {
+        lock_ObtainWrite(&newDscp->dirlock);
+        newDirOp.lockType = CM_DIRLOCK_WRITE;
         lock_ObtainMutex(&newDscp->mx);
         cm_SyncOpDone(newDscp, NULL, CM_SCACHESYNC_STOREDATA);
-        if (code == 0) {
+        if (code == 0)
             cm_MergeStatus(NULL, newDscp, &updatedNewDirStatus, &volSync,
                             userp, 0);
+        lock_ReleaseMutex(&newDscp->mx);
 
+        if (code == 0) {
             /* we only make the local change if we successfully made
                the change in the old directory AND there was only one
                change in the new directory */
             if (diropCode == 0 && cm_CheckDirOpForSingleChange(&newDirOp)) {
                 cm_DirCreateEntry(&newDirOp, newNamep, &fileFid);
+#ifdef USE_BPLUS
+                cm_BPlusDirCreateEntry(&newDirOp, newNamep, &fileFid);
+#endif
             }
         }
         cm_EndDirOp(&newDirOp);
-        lock_ReleaseMutex(&newDscp->mx);
     }
 
     /* and return error code */
index a9dc5a0..6ac2202 100644 (file)
@@ -15,6 +15,8 @@ The locking/waiting hierarchy here is this:
 
  - smb_fid_t mutex
 
+ - scache dirlock
+
  - scache flags (storing, fetching, etc); if set for more than one
        vnode in a volume, must be set in vnode order (see cm_Rename).