Parallel I/O extensions to namei backend
authorAndrew Deason <adeason@sinenomine.net>
Thu, 11 Mar 2010 18:19:47 +0000 (12:19 -0600)
committerDerrick Brashear <shadow@dementia.org>
Thu, 14 Oct 2010 13:49:23 +0000 (06:49 -0700)
This adds the ability for certain namei operations (currently only
ListViceInodes) to occur across multiple different threads in
parallel. Currently this is only enabled when built with the
not-yet-existant AFS_SALSRV_ENV.

Originally written by Tom Keiser.

Change-Id: I392653670378dbca3007e98a0cb09fe4474dd262
Reviewed-on: http://gerrit.openafs.org/1864
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Derrick Brashear <shadow@dementia.org>
Tested-by: Derrick Brashear <shadow@dementia.org>

src/vol/namei_ops.c
src/vol/namei_ops.h

index b250a53..7b505c2 100644 (file)
 #define   LOCK_UN   8    /* unlock */
 #endif
 
+#ifdef AFS_SALSRV_ENV
+#include <pthread.h>
+#include <afs/work_queue.h>
+#include <afs/thread_pool.h>
+#include <vol/vol-salvage.h>
+#endif
+
 #ifndef HAVE_FLOCK
 #include <fcntl.h>
 
@@ -955,6 +962,15 @@ namei_copy_on_write(IHandle_t *h)
 #define LINKTABLE_WIDTH 2
 #define LINKTABLE_SHIFT 1      /* log 2 = 1 */
 
+/**
+ * compute namei link table file and bit offset from inode number.
+ *
+ * @param[in]   ino     inode number
+ * @param[out]  offset  link table file offset
+ * @param[out]  index   bit offset within 2-byte record
+ *
+ * @internal
+ */
 static void
 namei_GetLCOffsetAndIndexFromIno(Inode ino, afs_foff_t * offset, int *index)
 {
@@ -965,10 +981,33 @@ namei_GetLCOffsetAndIndexFromIno(Inode ino, afs_foff_t * offset, int *index)
     *index = (tindex << 1) + tindex;
 }
 
+#ifdef AFS_PTHREAD_ENV
+/* XXX do static initializers work for WINNT/pthread? */
+pthread_mutex_t _namei_glc_lock = PTHREAD_MUTEX_INITIALIZER;
+#define NAMEI_GLC_LOCK assert(pthread_mutex_lock(&_namei_glc_lock) == 0)
+#define NAMEI_GLC_UNLOCK assert(pthread_mutex_unlock(&_namei_glc_lock) == 0)
+#else /* !AFS_PTHREAD_ENV */
+#define NAMEI_GLC_LOCK
+#define NAMEI_GLC_UNLOCK
+#endif /* !AFS_PTHREAD_ENV */
 
-/* namei_GetLinkCount
- * If lockit is set, lock the file and leave it locked upon a successful
- * return.
+/**
+ * get the link count of an inode.
+ *
+ * @param[in]  h        namei link count table file handle
+ * @param[in]  ino      inode number for which we are requesting a link count
+ * @param[in]  lockit   if asserted, return with lock held on link table file
+ * @param[in]  fixup    if asserted, write 1 to link count when read() returns
+ *                      zero (at EOF)
+ * @param[in]  nowrite  return success on zero byte read or ZLC
+ *
+ * @post if lockit asserted and lookup was successful, will return with write
+ *       lock on link table file descriptor
+ *
+ * @return link count
+ *    @retval -1 namei link table i/o error
+ *
+ * @internal
  */
 static int
 namei_GetLinkCount2(FdHandle_t * h, Inode ino, int lockit, int fixup, int nowrite)
@@ -992,20 +1031,41 @@ namei_GetLinkCount2(FdHandle_t * h, Inode ino, int lockit, int fixup, int nowrit
     if ((rc == 0 || !((row >> index) & NAMEI_TAGMASK)) && fixup && nowrite)
         return 1;
     if (rc == 0 && fixup) {
+       /*
+        * extend link table and write a link count of 1 for ino
+        *
+        * in order to make MT-safe, truncation (extension really)
+        * must happen under a mutex
+        */
         struct stat st;
-        if (fstat(h->fd_fd, &st) || st.st_size >= offset+sizeof(row))
-          goto bad_getLinkByte;
+       NAMEI_GLC_LOCK;
+        if (fstat(h->fd_fd, &st) || st.st_size >= offset+sizeof(row)) {
+           NAMEI_GLC_UNLOCK;
+           goto bad_getLinkByte;
+       }
         FDH_TRUNC(h, offset+sizeof(row));
         row = 1 << index;
        rc = FDH_PWRITE(h, (char *)&row, sizeof(row), offset);
+       NAMEI_GLC_UNLOCK;
     }
     if (rc != sizeof(row)) {
        goto bad_getLinkByte;
     }
 
     if (fixup && !((row >> index) & NAMEI_TAGMASK)) {
-        row |= 1<<index;
-       rc = FDH_PWRITE(h, (char *)&row, sizeof(row), offset);
+       /*
+        * fix up zlc
+        *
+        * in order to make this mt-safe, we need to do the read-modify-write
+        * under a mutex.  thus, we repeat the read inside the lock.
+        */
+       NAMEI_GLC_LOCK;
+       rc = FDH_PREAD(h, (char *)&row, sizeof(row), offset);
+       if (rc == sizeof(row)) {
+           row |= 1<<index;
+           rc = FDH_PWRITE(h, (char *)&row, sizeof(row), offset);
+       }
+       NAMEI_GLC_UNLOCK;
         if (rc != sizeof(row))
            goto bad_getLinkByte;
     }
@@ -1182,18 +1242,31 @@ VerifyDirPerms(char *path)
     }
 }
 
-/* ListViceInodes
+/**
  * Fill the results file with the requested inode information.
  *
- * Return values:
- *  0 - success
- * -1 - complete failure, salvage should terminate.
- * -2 - not enough space on partition, salvager has error message for this.
- *
  * This code optimizes single volume salvages by just looking at that one
  * volume's directory.
  *
- * If the resultFile is NULL, then don't call the write routine.
+ * @param[in]   devname             device name string
+ * @param[in]   moutnedOn           vice partition mount point
+ * @param[in]   resultFile          result file in which to write inode
+ *                                  metadata.  If NULL, write routine is not
+ *                                  called.
+ * @param[in]   judgeInode          filter function pointer.  if not NULL, only
+ *                                  inodes for which this routine returns non-
+ *                                  zero will be written to the results file.
+ * @param[in]   singleVolumeNumber  volume id filter
+ * @param[out]  forcep              always set to 0 for namei impl
+ * @param[in]   forceR              not used by namei impl
+ * @param[in]   wpath               not used by namei impl
+ * @param[in]   rock                opaque pointer passed to judgeInode
+ *
+ * @return operation status
+ *    @retval 0   success
+ *    @retval -1  complete failure, salvage should terminate.
+ *    @retval -2  not enough space on partition, salvager has error message
+ *                for this.
  */
 int
 ListViceInodes(char *devname, char *mountedOn, FILE *inodeFile,
@@ -1248,17 +1321,31 @@ ListViceInodes(char *devname, char *mountedOn, FILE *inodeFile,
 }
 
 
-/* namei_ListAFSFiles
- *
+/**
  * Collect all the matching AFS files on the drive.
  * If singleVolumeNumber is non-zero, just return files for that volume.
  *
- * Returns <0 on error, else number of files found to match.
+ * @param[in] dev                 vice partition path
+ * @param[in] writeFun            function pointer to a function which
+ *                                writes inode information to FILE fp
+ * @param[in] fp                  file stream where inode metadata is sent
+ * @param[in] judgeFun            filter function pointer.  if not NULL,
+ *                                only entries for which a non-zero value
+ *                                is returned are written to fp
+ * @param[in] singleVolumeNumber  volume id filter.  if nonzero, only
+ *                                process files for that specific volume id
+ * @param[in] rock                opaque pointer passed into writeFun and
+ *                                judgeFun
+ *
+ * @return operation status
+ *    @retval <0 error
+ *    @retval >=0 number of matching files found
  */
 int
 namei_ListAFSFiles(char *dev,
                   int (*writeFun) (FILE *, struct ViceInodeInfo *, char *,
-                                   char *), FILE * fp,
+                                   char *),
+                  FILE * fp,
                   int (*judgeFun) (struct ViceInodeInfo *, afs_uint32, void *),
                   afs_uint32 singleVolumeNumber, void *rock)
 {
@@ -1317,30 +1404,533 @@ namei_ListAFSFiles(char *dev,
     return ninodes;
 }
 
+#ifdef DELETE_ZLC
+static void AddToZLCDeleteList(char dir, char *name);
+static void DeleteZLCFiles(char *path);
+#endif
+
+/**
+ * examine a namei volume special file.
+ *
+ * @param[in] path1               volume special directory path
+ * @param[in] dname               directory entry name
+ * @param[in] myIH                inode handle to volume directory
+ * @param[out] linkHandle         namei link count fd handle.  if
+ *                                the inode in question is the link
+ *                                table, then the FdHandle is populated
+ * @param[in] writeFun            metadata write function pointer
+ * @param[in] fp                  file pointer where inode metadata
+ *                                is written by (*writeFun)()
+ * @param[in] judgeFun            inode filter function pointer.  if
+ *                                not NULL, only inodes for which this
+ *                                function returns non-zero are recorded
+ *                                into fp by writeFun
+ * @param[in] singleVolumeNumer   volume id filter.  if non-zero, only
+ *                                inodes associated with this volume id
+ *                                are recorded by writeFun
+ * @param[in] rock                opaque pointer passed to writeFun and
+ *                                judgeFun
+ *
+ * @return operation status
+ *    @retval 1 count this inode
+ *    @retval 0 don't count this inode
+ *    @retval -1 failure
+ *
+ * @internal
+ */
+static int
+_namei_examine_special(char * path1,
+                      char * dname,
+                      IHandle_t * myIH,
+                      FdHandle_t * linkHandle,
+                      int (*writeFun) (FILE *, struct ViceInodeInfo *, char *,
+                                       char *),
+                      FILE * fp,
+                      int (*judgeFun) (struct ViceInodeInfo *, afs_uint32, void *),
+                      int singleVolumeNumber,
+                      void *rock)
+{
+    int ret = 0;
+    struct ViceInodeInfo info;
+
+    if (DecodeInode(path1, dname, &info, myIH->ih_vid) < 0) {
+       ret = 0;
+       goto error;
+    }
+
+    if (info.u.param[2] != VI_LINKTABLE) {
+       info.linkCount = 1;
+    } else {
+       char path2[512];
+       /* Open this handle */
+       (void)afs_snprintf(path2, sizeof(path2),
+                          "%s/%s", path1, dname);
+       linkHandle->fd_fd = afs_open(path2, Testing ? O_RDONLY : O_RDWR, 0666);
+       info.linkCount =
+           namei_GetLinkCount2(linkHandle, (Inode) 0, 1, 1, Testing);
+    }
+
+    if (!judgeFun ||
+       (*judgeFun) (&info, singleVolumeNumber, rock)) {
+       ret = 1;
+        if ((*writeFun) (fp, &info, path1, dname) < 0) {
+           ret = -1;
+       }
+    }
+
+ error:
+    return ret;
+}
+
+/**
+ * examine a namei file.
+ *
+ * @param[in] path1               volume special directory path
+ * @param[in] dname               directory entry name
+ * @param[in] myIH                inode handle to volume directory
+ * @param[in] linkHandle          namei link count fd handle.
+ * @param[in] writeFun            metadata write function pointer
+ * @param[in] fp                  file pointer where inode metadata
+ *                                is written by (*writeFun)()
+ * @param[in] judgeFun            inode filter function pointer.  if
+ *                                not NULL, only inodes for which this
+ *                                function returns non-zero are recorded
+ *                                into fp by writeFun
+ * @param[in] singleVolumeNumer   volume id filter.  if non-zero, only
+ *                                inodes associated with this volume id
+ *                                are recorded by writeFun
+ * @param[in] rock                opaque pointer passed to writeFun and
+ *                                judgeFun
+ *
+ * @return operation status
+ *    @retval 1 count this inode
+ *    @retval 0 don't count this inode
+ *    @retval -1 failure
+ *    @retval -2 request ZLC delete
+ *
+ * @internal
+ */
+static int
+_namei_examine_reg(char * path3,
+                  char * dname,
+                  IHandle_t * myIH,
+                  FdHandle_t * linkHandle,
+                  int (*writeFun) (FILE *, struct ViceInodeInfo *, char *,
+                                   char *),
+                  FILE * fp,
+                  int (*judgeFun) (struct ViceInodeInfo *, afs_uint32, void *),
+                  int singleVolumeNumber,
+                  void *rock)
+{
+    int ret = 0;
+    struct ViceInodeInfo info;
+#ifdef DELETE_ZLC
+    int i; /* XXX this isn't set anywhere, nor was it set in
+           *     namei_ListAFSSubdirs.  wtf? */
+#endif
+
+    if (DecodeInode(path3, dname, &info, myIH->ih_vid) < 0) {
+       goto error;
+    }
+
+    info.linkCount =
+       namei_GetLinkCount2(linkHandle,
+                           info.inodeNumber, 1, 1, Testing);
+    if (info.linkCount == 0) {
+#ifdef DELETE_ZLC
+       Log("Found 0 link count file %s/%s, deleting it.\n", path3, dname);
+#ifdef AFS_SALSRV_ENV
+       /* defer -- the AddToZLCDeleteList() interface is not MT-safe */
+       ret = -2;
+#else /* !AFS_SALSRV_ENV */
+       AddToZLCDeleteList((char)i, dname);
+#endif /* !AFS_SALSRV_ENV */
+#else /* !DELETE_ZLC */
+       Log("Found 0 link count file %s/%s.\n", path3,
+           dname);
+#endif
+       goto error;
+    }
+
+    if (!judgeFun ||
+       (*judgeFun) (&info, singleVolumeNumber, rock)) {
+       ret = 1;
+        if ((*writeFun) (fp, &info, path3, dname) < 0) {
+           ret = -1;
+       }
+    }
+
+ error:
+    return ret;
+}
+
+/**
+ * listsubdirs work queue node.
+ */
+struct listsubdirs_work_node {
+#ifdef AFS_SALSRV_ENV
+    int *error;                         /**< *error set if an error was
+                                         *   encountered in any listsubdirs
+                                         *   thread. */
+#endif
+
+    IHandle_t * IH;                     /**< volume directory handle */
+    FdHandle_t *linkHandle;             /**< namei link count fd handle. when
+                                         *   examinining the link table special
+                                         *   inode, this will be pointed at the
+                                         *   link table
+                                         */
+    FILE * fp;                          /**< file pointer for writeFun */
+
+    /** function which will write inode metadata to fp */
+    int (*writeFun) (FILE *, struct ViceInodeInfo *, char *, char *);
+
+    /** inode filter function */
+    int (*judgeFun) (struct ViceInodeInfo *, afs_uint32, void *);
+    int singleVolumeNumber;             /**< volume id filter */
+    void * rock;                        /**< pointer passed to writeFun and judgeFun */
+    int code;                           /**< return code from examine function */
+    int special;                        /**< asserted when this is a volume
+                                        *   special file */
+};
+
+/**
+ * simple wrapper around _namei_examine_special and _namei_examine_reg.
+ *
+ * @param[in] work  the struct listsubdirs_work_node for the associated
+ *                  "list subdirs" job
+ * @param[in] dir   the directory to examine
+ * @param[in] filename  the filename in 'dir' to examine
+ *
+ * @return operation status
+ *   @retval 1  count this inode
+ *   @retval 0  don't count this inode
+ *   @retval -1 failure
+ */
+static_inline int
+_namei_examine_file(const struct listsubdirs_work_node *work, char *dir,
+                    char *filename)
+{
+    if (work->special) {
+       return _namei_examine_special(dir, filename, work->IH,
+                                     work->linkHandle, work->writeFun, work->fp,
+                                     work->judgeFun, work->singleVolumeNumber,
+                                     work->rock);
+    } else {
+       return _namei_examine_reg(dir, filename, work->IH,
+                                 work->linkHandle, work->writeFun, work->fp,
+                                 work->judgeFun, work->singleVolumeNumber,
+                                 work->rock);
+    }
+}
+
+
+#ifdef AFS_SALSRV_ENV
+/** @addtogroup afs_vol_salsrv_pario */
+/*@{*/
+
+/**
+ * arguments for the _namei_examine_file_cbk callback function.
+ */
+struct listsubdirs_args {
+    const struct listsubdirs_work_node *work; /**< arguments that are the same
+                                               *   for all invocations of
+                                               *   _namei_examine_file_cbk, in
+                                               *   threads */
+    int *result;        /**< where we can store the return code of _namei_examine_file */
+
+    char dir[512];      /**< directory to examine */
+    char filename[256]; /**< filename in 'dir' to examine */
+};
+
+/**
+ * a node in the list of results of listsubdir jobs.
+ */
+struct listsubdirs_result {
+    struct rx_queue q;
+    int inodes;        /**< return value from _namei_examine_file */
+};
+
+/**
+ * clean up a list of 'struct listsubdirs_result's and interpret the results.
+ *
+ * @param[in] resultlist  a list of 'struct listsubdirs_result's
+ *
+ * @return number of inodes found
+ *   @retval -1  error
+ */
+static int
+_namei_listsubdirs_cleanup_results(struct rx_queue *resultlist)
+{
+    struct listsubdirs_result *res, *nres;
+    int ret = 0;
+
+    for(queue_Scan(resultlist, res, nres, listsubdirs_result)) {
+       if (ret < 0) {
+           /* noop, retain erroneous error code */
+       } else if (res->inodes < 0) {
+           ret = -1;
+       } else {
+           ret += res->inodes;
+       }
+
+       queue_Remove(res);
+       free(res);
+       res = NULL;
+    }
+
+    return ret;
+}
+
+/**
+ * wait for the spawned listsubdirs jobs to finish, and return how many inodes
+ * they found.
+ *
+ * @param[in] queue    queue to wait to finish
+ * @param[in] resultlist list of 'struct listsubdirs_result's that the queued
+ *                       jobs are storing their results in
+ *
+ * @return number of inodes found
+ *   @retval -1  error
+ */
+static int
+_namei_listsubdirs_wait(struct afs_work_queue *queue, struct rx_queue *resultlist)
+{
+    int code;
 
+    code = afs_wq_wait_all(queue);
+    if (code) {
+       return -1;
+    }
 
-/* namei_ListAFSSubDirs
+    return _namei_listsubdirs_cleanup_results(resultlist);
+}
+
+/**
+ * work queue entry point for examining namei files.
+ *
+ * @param[in] queue        pointer to struct Vwork_queue
+ * @param[in] node         pointer to struct Vwork_queue_node
+ * @param[in] queue_rock   opaque pointer to struct salsrv_pool_state
+ * @param[in] node_rock    opaque pointer to struct listsubdirs_work_node
+ * @param[in] caller_rock  opaque pointer to struct salsrv_worker_thread_state
+ *
+ * @return operation status
  *
+ * @see Vwork_queue_callback_func_t
  *
- * Return values:
- * < 0 - an error
- * > = 0 - number of AFS files found.
+ * @internal
+ */
+static int
+_namei_examine_file_cbk(struct afs_work_queue *queue,
+                        struct afs_work_queue_node *node,
+                        void *queue_rock,
+                        void *node_rock,
+                        void *caller_rock)
+{
+    int code;
+    struct listsubdirs_args *args = node_rock;
+    const struct listsubdirs_work_node * work = args->work;
+    char *dir = args->dir;
+    char *filename = args->filename;
+
+    code = _namei_examine_file(work, dir, filename);
+
+    *(args->result) = code;
+
+    if (code < 0) {
+       *(work->error) = 1;
+       /* we've errored, so no point in letting more jobs continue */
+       afs_wq_shutdown(queue);
+    }
+
+    return 0;
+}
+
+static pthread_once_t wq_once = PTHREAD_ONCE_INIT;
+static pthread_key_t wq_key;
+
+/**
+ * create the wq_key key for storing a work queue.
+ */
+static void
+_namei_wq_keycreate(void)
+{
+    assert(pthread_key_create(&wq_key, NULL) == 0);
+}
+
+/**
+ * set the work queue for this thread to use for backgrounding namei ops.
+ *
+ * The work queue will be used in ListAFSSubdirs (called indirectly by
+ * ListViceInodes) to examine files in parallel.
+ *
+ * @param[in] wq  the work queue to use
+ */
+void
+namei_SetWorkQueue(struct afs_work_queue *wq)
+{
+    assert(pthread_once(&wq_once, _namei_wq_keycreate) == 0);
+
+    assert(pthread_setspecific(wq_key, wq) == 0);
+}
+
+/**
+ * enqueue an examine file work unit.
+ *
+ * @param[in] work     the _namei_examine_file arguments that are common to
+ *                     all callers within the same ListAFSFiles operation
+ * @param[in] dir      the specific directory to look at (string will be
+ *                     copied; can be stack/temporary memory)
+ * @param[in] filename the filename to look at (string will be copied; can be
+ *                     stack/temporary memory)
+ * @param[in] wq       work queue to enqueue this work unit to
+ * @param[in] resultlist the list to append the 'struct listsubdirs_result' to
+ *                       for the enqueued work unit
+ *
+ * @return operation status
+ *    @retval 0 success
+ *    @retval -1 fatal error
+ *
+ * @note errors MUST be indicated by a -1 error code and nothing else, to be
+ *       compatible with _namei_examine_reg and _namei_examine_special
+ *
+ * @internal
+ */
+static int
+_namei_examine_file_spawn(const struct listsubdirs_work_node *work,
+                          const char *dir, const char *filename,
+                          struct afs_work_queue *wq,
+                          struct rx_queue *resultlist)
+{
+    int code, ret = 0;
+    struct listsubdirs_args *args = NULL;
+    struct listsubdirs_result *result = NULL;
+    struct afs_work_queue_node *node = NULL;
+    struct afs_work_queue_add_opts opts;
+
+    args = malloc(sizeof(*args));
+    if (args == NULL) {
+       ret = -1;
+       goto error;
+    }
+
+    result = malloc(sizeof(*result));
+    if (result == NULL) {
+       ret = -1;
+       goto error;
+    }
+
+    code = afs_wq_node_alloc(&node);
+    if (code) {
+       ret = -1;
+       goto error;
+    }
+    code = afs_wq_node_set_detached(node);
+    if (code) {
+       ret = -1;
+       goto error;
+    }
+
+    args->work = work;
+    args->result = &result->inodes;
+    strlcpy(args->dir, dir, sizeof(args->dir));
+    strlcpy(args->filename, filename, sizeof(args->filename));
+
+    code = afs_wq_node_set_callback(node,
+                                        &_namei_examine_file_cbk,
+                                        args, &free);
+    if (code) {
+       ret = -1;
+       goto error;
+    }
+    args = NULL;
+
+    afs_wq_add_opts_init(&opts);
+    opts.donate = 1;
+
+    code = afs_wq_add(wq, node, &opts);
+    if (code) {
+       ret = -1;
+       goto error;
+    }
+    node = NULL;
+
+    queue_Append(resultlist, result);
+    result = NULL;
+
+ error:
+    if (node) {
+       afs_wq_node_put(node);
+       node = NULL;
+    }
+    if (args) {
+       free(args);
+       args = NULL;
+    }
+    if (result) {
+       free(result);
+       result = NULL;
+    }
+
+    return ret;
+}
+
+/*@}*/
+#else /* !AFS_SALSRV_ENV */
+# define _namei_examine_file_spawn(work, dir, file, wq, resultlist) \
+         _namei_examine_file(work, dir, file)
+#endif /* !AFS_SALSRV_ENV */
+
+/**
+ * traverse and check inodes.
+ *
+ * @param[in] dirIH               volume group directory handle
+ * @param[in] writeFun            function pointer which will write inode
+ *                                metadata to FILE stream fp
+ * @param[in] fp                  file stream where inode metadata gets
+ *                                written
+ * @param[in] judgeFun            inode filter function.  if not NULL, only
+ *                                inodes for which the filter returns non-zero
+ *                                will be written out by writeFun
+ * @param[in] singleVolumeNumber  volume id filter.  only inodes matching this
+ *                                filter are written out by writeFun
+ * @param[in] rock                opaque pointer passed to judgeFun and writeFun
+ *
+ * @return operation status
+ *    @retval <0 error
+ *    @retval >=0 number of matching inodes found
+ *
+ * @todo the salsrv implementation may consume a lot of
+ *       memory for a large volume.  at some point we should
+ *       probably write a background thread to asynchronously
+ *       clean up the resultlist nodes to reduce memory footprint
+ *
+ * @internal
  */
 static int
 namei_ListAFSSubDirs(IHandle_t * dirIH,
                     int (*writeFun) (FILE *, struct ViceInodeInfo *, char *,
-                                     char *), FILE * fp,
+                                     char *),
+                    FILE * fp,
                     int (*judgeFun) (struct ViceInodeInfo *, afs_uint32, void *),
                     afs_uint32 singleVolumeNumber, void *rock)
 {
+    int code = 0, ret = 0;
     IHandle_t myIH = *dirIH;
     namei_t name;
     char path1[512], path2[512], path3[512];
     DIR *dirp1, *dirp2, *dirp3;
     struct dirent *dp1, *dp2, *dp3;
-    struct ViceInodeInfo info;
     FdHandle_t linkHandle;
     int ninodes = 0;
+    struct listsubdirs_work_node work;
+#ifdef AFS_SALSRV_ENV
+    int error = 0;
+    struct afs_work_queue *wq;
+    int wq_up = 0;
+    struct rx_queue resultlist;
+#endif
 #ifdef DELETE_ZLC
     int i;
     static void AddToZLCDeleteList(char dir, char *name);
@@ -1357,41 +1947,82 @@ namei_ListAFSSubDirs(IHandle_t * dirIH,
     (void)strcat(path1, NAMEI_SPECDIR);
 
     linkHandle.fd_fd = -1;
+#ifdef AFS_SALSRV_ENV
+    assert(pthread_once(&wq_once, _namei_wq_keycreate) == 0);
+
+    wq = pthread_getspecific(wq_key);
+    if (!wq) {
+       ret = -1;
+       goto error;
+    }
+    wq_up = 1;
+    queue_Init(&resultlist);
+#endif
+
+    memset(&work, 0, sizeof(work));
+    work.linkHandle = &linkHandle;
+    work.IH = &myIH;
+    work.fp = fp;
+    work.writeFun = writeFun;
+    work.judgeFun = judgeFun;
+    work.singleVolumeNumber = singleVolumeNumber;
+    work.rock = rock;
+    work.special = 1;
+#ifdef AFS_SALSRV_ENV
+    work.error = &error;
+#endif
+
     dirp1 = opendir(path1);
     if (dirp1) {
        while ((dp1 = readdir(dirp1))) {
            if (*dp1->d_name == '.')
                continue;
-           if (DecodeInode(path1, dp1->d_name, &info, myIH.ih_vid) < 0)
-               continue;
-           if (info.u.param[2] != VI_LINKTABLE) {
-               info.linkCount = 1;
-           } else {
-               /* Open this handle */
-               (void)afs_snprintf(path2, sizeof path2, "%s/%s", path1,
-                                  dp1->d_name);
-               linkHandle.fd_fd = afs_open(path2, Testing ? O_RDONLY : O_RDWR, 0666);
-               info.linkCount =
-                   namei_GetLinkCount2(&linkHandle, (Inode) 0, 1, 1, Testing);
+
+#ifdef AFS_SALSRV_ENV
+           if (error) {
+               closedir(dirp1);
+               ret = -1;
+               goto error;
            }
-           if (judgeFun && !(*judgeFun) (&info, singleVolumeNumber, rock))
-               continue;
+#endif /* AFS_SALSRV_ENV */
 
-           if ((*writeFun) (fp, &info, path1, dp1->d_name) < 0) {
-               if (linkHandle.fd_fd >= 0)
-                   close(linkHandle.fd_fd);
+           code = _namei_examine_file_spawn(&work, path1, dp1->d_name, wq, &resultlist);
+
+           switch (code) {
+           case -1:
+               /* fatal error */
                closedir(dirp1);
-               return -1;
+               ret = -1;
+               goto error;
+
+           case 1:
+               /* count this inode */
+#ifndef AFS_SALSRV_ENV
+               ninodes++;
+#endif
+               break;
            }
-           ninodes++;
        }
        closedir(dirp1);
     }
 
+#ifdef AFS_SALSRV_ENV
+    /* effectively a barrier */
+    code = _namei_listsubdirs_wait(wq, &resultlist);
+    if (code < 0 || error) {
+       ret = -1;
+       goto error;
+    }
+    error = 0;
+    ninodes += code;
+#endif
+
     /* Now run through all the other subdirs */
     namei_HandleToVolDir(&name, &myIH);
     strlcpy(path1, name.n_path, sizeof(path1));
 
+    work.special = 0;
+
     dirp1 = opendir(path1);
     if (dirp1) {
        while ((dp1 = readdir(dirp1))) {
@@ -1411,40 +2042,41 @@ namei_ListAFSSubDirs(IHandle_t * dirIH,
                    /* Now we've got to the actual data */
                    afs_snprintf(path3, sizeof(path3), "%s/%s", path2,
                                 dp2->d_name);
+
                    dirp3 = opendir(path3);
                    if (dirp3) {
                        while ((dp3 = readdir(dirp3))) {
                            if (*dp3->d_name == '.')
                                continue;
-                           if (DecodeInode
-                               (path3, dp3->d_name, &info, myIH.ih_vid) < 0)
-                               continue;
-                           info.linkCount =
-                               namei_GetLinkCount2(&linkHandle,
-                                                  info.inodeNumber, 1, 1, Testing);
-                           if (info.linkCount == 0) {
-#ifdef DELETE_ZLC
-                               Log("Found 0 link count file %s/%s, deleting it.\n", path3, dp3->d_name);
-                               AddToZLCDeleteList((char)i, dp3->d_name);
-#else
-                               Log("Found 0 link count file %s/%s.\n", path3,
-                                   dp3->d_name);
-#endif
-                               continue;
+
+#ifdef AFS_SALSRV_ENV
+                           if (error) {
+                               closedir(dirp3);
+                               closedir(dirp2);
+                               closedir(dirp1);
+                               ret = -1;
+                               goto error;
                            }
-                           if (judgeFun
-                               && !(*judgeFun) (&info, singleVolumeNumber, rock))
-                               continue;
+#endif /* AFS_SALSRV_ENV */
 
-                           if ((*writeFun) (fp, &info, path3, dp3->d_name) <
-                               0) {
-                               close(linkHandle.fd_fd);
+                           code = _namei_examine_file_spawn(&work, path3,
+                                                            dp3->d_name, wq,
+                                                            &resultlist);
+
+                           switch (code) {
+                           case -1:
                                closedir(dirp3);
                                closedir(dirp2);
                                closedir(dirp1);
-                               return -1;
+                               ret = -1;
+                               goto error;
+
+                           case 1:
+#ifndef AFS_SALSRV_ENV
+                               ninodes++;
+#endif
+                               break;
                            }
-                           ninodes++;
                        }
                        closedir(dirp3);
                    }
@@ -1455,17 +2087,42 @@ namei_ListAFSSubDirs(IHandle_t * dirIH,
        closedir(dirp1);
     }
 
-    if (linkHandle.fd_fd >= 0)
-       close(linkHandle.fd_fd);
+#ifdef AFS_SALSRV_ENV
+    /* effectively a barrier */
+    code = _namei_listsubdirs_wait(wq, &resultlist);
+    if (code < 0 || error) {
+       ret = -1;
+       goto error;
+    }
+    error = 0;
+    ninodes += code;
+    wq_up = 0;
+#endif
+
     if (!ninodes) {
        /* Then why does this directory exist? Blow it away. */
        namei_HandleToVolDir(&name, dirIH);
        namei_RemoveDataDirectories(&name);
     }
 
-    return ninodes;
+ error:
+#ifdef AFS_SALSRV_ENV
+    if (wq_up) {
+       afs_wq_wait_all(wq);
+    }
+    _namei_listsubdirs_cleanup_results(&resultlist);
+#endif
+    if (linkHandle.fd_fd >= 0)
+       close(linkHandle.fd_fd);
+
+    if (!ret) {
+       ret = ninodes;
+    }
+    return ret;
 }
 
+/*@}*/
+
 static int
 DecodeVolumeName(char *name, unsigned int *vid)
 {
index b76cdf0..6705ebb 100644 (file)
@@ -77,6 +77,11 @@ void namei_HandleToName(namei_t * name, IHandle_t * h);
 int namei_ConvertROtoRWvolume(char *pname, afs_uint32 volumeId);
 int namei_replace_file_by_hardlink(IHandle_t *hLink, IHandle_t *hTarget);
 
+# ifdef AFS_SALSRV_ENV
+#  include <afs/work_queue.h>
+extern void namei_SetWorkQueue(struct afs_work_queue *wq);
+# endif
+
 #endif /* AFS_NAMEI_ENV */
 
 #endif /* _AFS_NAMEI_OPS_H_H_ */