diff --git a/lustre/ChangeLog b/lustre/ChangeLog
index 1aa5b59a2a4feeb666a4600cffb3411219c88fbd..17b5cb937342cdb8a2718da470f9e74b78595048 100644
--- a/lustre/ChangeLog
+++ b/lustre/ChangeLog
@@ -187,6 +187,11 @@ Description: reply_lock_interpret crash due to race with it and lock cancel.
 Details    : Do not replay locks that are being cancelled. Do not reference
              locks by their address during replay, just by their handle.
 
+Severity   : enhancement
+Bugzilla   : 11401
+Description: client-side metadata stat-ahead during readdir(directory readahead)
+Details    : perform client-side metadata stat-ahead when the client detects
+             readdir and sequential stat of dir entries therein
 
 --------------------------------------------------------------------------------
 
diff --git a/lustre/include/lustre_mds.h b/lustre/include/lustre_mds.h
index 981e4449370353bf46f4c41899547079f7aa0a84..0759c5ff27ec8c64f314c0d3e099080c6de9797f 100644
--- a/lustre/include/lustre_mds.h
+++ b/lustre/include/lustre_mds.h
@@ -113,6 +113,8 @@ int mds_reint_rec(struct mds_update_record *r, int offset,
 /* mds/mds_lov.c */
 
 /* mdc/mdc_locks.c */
+struct md_enqueue_info;
+
 int it_disposition(struct lookup_intent *it, int flag);
 void it_set_disposition(struct lookup_intent *it, int flag);
 void it_clear_disposition(struct lookup_intent *it, int flag);
@@ -120,6 +122,9 @@ int it_open_error(int phase, struct lookup_intent *it);
 void mdc_set_lock_data(__u64 *lockh, void *data);
 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
                       ldlm_iterator_t it, void *data);
+int mdc_revalidate_lock(struct obd_export *exp,
+                        struct lookup_intent *it,
+                        struct ll_fid *fid);
 int mdc_intent_lock(struct obd_export *exp,
                     struct mdc_op_data *,
                     void *lmm, int lmmsize,
@@ -130,6 +135,9 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 struct lookup_intent *it, struct mdc_op_data *data,
                 struct lustre_handle *lockh, void *lmm, int lmmlen,
                 int extra_lock_flags);
+int mdc_intent_getattr_async(struct obd_export *exp,
+                             struct md_enqueue_info *minfo,
+                             struct ldlm_enqueue_info *einfo);
 
 /* mdc/mdc_request.c */
 int mdc_init_ea_size(struct obd_export *mdc_exp, struct obd_export *lov_exp);
@@ -197,6 +205,18 @@ static inline void mdc_pack_fid(struct ll_fid *fid, obd_id ino, __u32 gen,
         fid->f_type = type;
 }
 
+static inline int it_to_lock_mode(struct lookup_intent *it)
+{
+        /* CREAT needs to be tested before open (both could be set) */
+        if (it->it_op & IT_CREAT)
+                return LCK_CW;
+        else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
+                return LCK_CR;
+
+        LBUG();
+        return -EINVAL;
+}
+
 /* ioctls for trying requests */
 #define IOC_REQUEST_TYPE                   'f'
 #define IOC_REQUEST_MIN_NR                 30
@@ -209,4 +229,25 @@ static inline void mdc_pack_fid(struct ll_fid *fid, obd_id ino, __u32 gen,
 #define IOC_REQUEST_CLOSE               _IOWR('f', 35, long)
 #define IOC_REQUEST_MAX_NR               35
 
+/* metadata stat-ahead */
+typedef int (* md_enqueue_cb_t)(struct obd_export *exp,
+                                struct ptlrpc_request *req,
+                                struct md_enqueue_info *minfo,
+                                int rc);
+
+struct md_enqueue_info {
+        struct obd_export      *mi_exp;
+        struct mdc_op_data      mi_data;
+        struct lookup_intent    mi_it;
+        struct lustre_handle    mi_lockh;
+        struct dentry          *mi_dentry;
+        md_enqueue_cb_t         mi_cb;
+        void                   *mi_cbdata;
+};
+
+struct mdc_enqueue_args {
+        struct md_enqueue_info   *ma_mi;
+        struct ldlm_enqueue_info *ma_ei;
+};
+
 #endif
diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c
index cfc8b9701375333d5c4b2685913b64b1760a6772..b43f64cb4cf689bf812face655c12f5299a11f82 100644
--- a/lustre/ldlm/ldlm_lockd.c
+++ b/lustre/ldlm/ldlm_lockd.c
@@ -106,8 +106,8 @@ struct ldlm_bl_work_item {
 static inline int have_expired_locks(void)
 {
         int need_to_run;
-
         ENTRY;
+
         spin_lock_bh(&waiting_locks_spinlock);
         need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
         spin_unlock_bh(&waiting_locks_spinlock);
diff --git a/lustre/llite/Makefile.in b/lustre/llite/Makefile.in
index dfa273bd92449893a7825a61544b8b9e6409f4e5..ff06efd3cdcdc47640cc4e3c23f0f09dc22fba6b 100644
--- a/lustre/llite/Makefile.in
+++ b/lustre/llite/Makefile.in
@@ -1,5 +1,5 @@
 MODULES := lustre
-lustre-objs := dcache.o dir.o file.o llite_close.o llite_lib.o llite_nfs.o rw.o lproc_llite.o namei.o symlink.o llite_mmap.o xattr.o
+lustre-objs := dcache.o dir.o file.o llite_close.o llite_lib.o llite_nfs.o rw.o lproc_llite.o namei.o symlink.o llite_mmap.o xattr.o statahead.o
 
 ifeq ($(PATCHLEVEL),4)
 lustre-objs += rw24.o super.o
diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c
index 12286953c98c6667aeb28d167be44a3fe015e527..29418ffe0bc1e86c68518654d56a88dd605c24f8 100644
--- a/lustre/llite/dcache.c
+++ b/lustre/llite/dcache.c
@@ -333,11 +333,11 @@ void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
 int ll_revalidate_it(struct dentry *de, int lookup_flags,
                      struct lookup_intent *it)
 {
-        int rc;
         struct mdc_op_data op_data;
         struct ptlrpc_request *req = NULL;
         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
         struct obd_export *exp;
+        int first = 0, rc;
 
         ENTRY;
         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
@@ -426,11 +426,16 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags,
                 }
         }
 
+        if (it->it_op == IT_GETATTR)
+                first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
+
 do_lock:
         it->it_create_mode &= ~current->fs->umask;
 
         rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, lookup_flags,
                              &req, ll_mdc_blocking_ast, 0);
+        if (it->it_op == IT_GETATTR && !first)
+                ll_statahead_exit(de, rc);
         /* If req is NULL, then mdc_intent_lock only tried to do a lock match;
          * if all was well, it will return 1 if it found locks, 0 otherwise. */
         if (req == NULL && rc >= 0) {
diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c
index 2666a0411fdfbe081c6f674145cba1a16253d44d..60e2762a5a4fcc86f62c250c9cba98a1571f8f37 100644
--- a/lustre/llite/dir.c
+++ b/lustre/llite/dir.c
@@ -27,7 +27,6 @@
  */
 
 #include <linux/fs.h>
-#include <linux/ext2_fs.h>
 #include <linux/pagemap.h>
 #include <linux/mm.h>
 #include <linux/version.h>
@@ -49,8 +48,6 @@
 #include <lustre_dlm.h>
 #include "llite_internal.h"
 
-typedef struct ext2_dir_entry_2 ext2_dirent;
-
 #ifdef HAVE_PG_FS_MISC
 #define PageChecked(page)        test_bit(PG_fs_misc, &(page)->flags)
 #define SetPageChecked(page)     set_bit(PG_fs_misc, &(page)->flags)
@@ -105,18 +102,6 @@ static inline unsigned ext2_chunk_size(struct inode *inode)
         return inode->i_sb->s_blocksize;
 }
 
-static inline void ext2_put_page(struct page *page)
-{
-        kunmap(page);
-        page_cache_release(page);
-}
-
-static inline unsigned long dir_pages(struct inode *inode)
-{
-        return (inode->i_size+CFS_PAGE_SIZE-1) >> CFS_PAGE_SHIFT;
-}
-
-
 static void ext2_check_page(struct inode *dir, struct page *page)
 {
         unsigned chunk_size = ext2_chunk_size(dir);
@@ -205,7 +190,7 @@ fail:
         SetPageError(page);
 }
 
-static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
+struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
 {
         struct ldlm_res_id res_id =
                 { .name = { dir->i_ino, (__u64)dir->i_generation} };
@@ -264,24 +249,6 @@ fail:
         goto out_unlock;
 }
 
-/*
- * p is at least 6 bytes before the end of page
- */
-static inline ext2_dirent *ext2_next_entry(ext2_dirent *p)
-{
-        return (ext2_dirent *)((char*)p + le16_to_cpu(p->rec_len));
-}
-
-static inline unsigned
-ext2_validate_entry(char *base, unsigned offset, unsigned mask)
-{
-        ext2_dirent *de = (ext2_dirent*)(base + offset);
-        ext2_dirent *p = (ext2_dirent*)(base + (offset&mask));
-        while ((char*)p < (char*)de)
-                p = ext2_next_entry(p);
-        return (char *)p - base;
-}
-
 static unsigned char ext2_filetype_table[EXT2_FT_MAX] = {
         [EXT2_FT_UNKNOWN]       DT_UNKNOWN,
         [EXT2_FT_REG_FILE]      DT_REG,
diff --git a/lustre/llite/file.c b/lustre/llite/file.c
index 87f7f1e575e0c73eb919438696f3fc24986c2dde..c59fcb626633c5f96930fd108cb1f5a4174c6f33 100644
--- a/lustre/llite/file.c
+++ b/lustre/llite/file.c
@@ -235,6 +235,9 @@ int ll_file_release(struct inode *inode, struct file *file)
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
                inode->i_generation, inode);
 
+        if (S_ISDIR(inode->i_mode))
+                ll_stop_statahead(inode);
+
         /* don't do anything for / */
         if (inode->i_sb->s_root == file->f_dentry)
                 RETURN(0);
@@ -262,6 +265,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
         struct inode *inode = file->f_dentry->d_inode;
         struct ptlrpc_request *req;
         int rc;
+        ENTRY;
 
         if (!parent)
                 RETURN(-ENOENT);
@@ -385,6 +389,9 @@ int ll_file_open(struct inode *inode, struct file *file)
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
                inode->i_generation, inode, file->f_flags);
 
+        if (S_ISDIR(inode->i_mode) && lli->lli_opendir_pid == 0)
+                lli->lli_opendir_pid = current->pid;
+
         /* don't do anything for / */
         if (inode->i_sb->s_root == file->f_dentry)
                 RETURN(0);
diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h
index b5a4d545342925058ec9b66fb2d9f7f5464d4f57..67617177aa1969c6ab6383f59386fbac696ef412 100644
--- a/lustre/llite/llite_internal.h
+++ b/lustre/llite/llite_internal.h
@@ -5,6 +5,7 @@
 #ifndef LLITE_INTERNAL_H
 #define LLITE_INTERNAL_H
 
+#include <linux/ext2_fs.h>
 #ifdef CONFIG_FS_POSIX_ACL
 # include <linux/fs.h>
 #ifdef HAVE_XATTR_ACL
@@ -107,6 +108,10 @@ struct ll_inode_info {
 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
         struct inode            lli_vfs_inode;
 #endif
+
+        /* metadata stat-ahead */
+        pid_t                   lli_opendir_pid;
+        struct ll_statahead_info *lli_sai;
 };
 
 /*
@@ -257,9 +262,19 @@ struct ll_sb_info {
         enum stats_track_type     ll_stats_track_type;
         int                       ll_stats_track_id;
         int                       ll_rw_stats_on;
-
         dev_t                     ll_sdev_orig; /* save s_dev before assign for
                                                  * clustred nfs */
+
+        /* metadata stat-ahead */
+        unsigned int              ll_sa_count; /* current statahead RPCs */
+        unsigned int              ll_sa_max;   /* max statahead RPCs */
+        unsigned int              ll_sa_wrong; /* statahead thread stopped for
+                                                * low hit ratio */
+        unsigned int              ll_sa_total; /* statahead thread started
+                                                * count */
+        unsigned long long        ll_sa_blocked; /* ls count waiting for
+                                                  * statahead */
+        unsigned long long        ll_sa_cached;  /* ls count got in cache */
 };
 
 #define LL_DEFAULT_MAX_RW_CHUNK         (32 * 1024 * 1024)
@@ -362,9 +377,9 @@ static inline struct inode *ll_info2i(struct ll_inode_info *lli)
 }
 
 struct it_cb_data {
-        struct inode *icbd_parent;
+        struct inode   *icbd_parent;
         struct dentry **icbd_childp;
-        obd_id hash;
+        obd_id          hash;
 };
 
 void ll_i2gids(__u32 *suppgids, struct inode *i1,struct inode *i2);
@@ -434,6 +449,38 @@ static void ll_stats_ops_tally(struct ll_sb_info *sbi, int op, int count) {}
 extern struct file_operations ll_dir_operations;
 extern struct inode_operations ll_dir_inode_operations;
 
+struct page *ll_get_dir_page(struct inode *dir, unsigned long n);
+/*
+ * p is at least 6 bytes before the end of page
+ */
+typedef struct ext2_dir_entry_2 ext2_dirent;
+
+static inline ext2_dirent *ext2_next_entry(ext2_dirent *p)
+{
+        return (ext2_dirent *)((char*)p + le16_to_cpu(p->rec_len));
+}
+
+static inline unsigned
+ext2_validate_entry(char *base, unsigned offset, unsigned mask)
+{
+        ext2_dirent *de = (ext2_dirent*)(base + offset);
+        ext2_dirent *p = (ext2_dirent*)(base + (offset&mask));
+        while ((char*)p < (char*)de)
+                p = ext2_next_entry(p);
+        return (char *)p - base;
+}
+
+static inline void ext2_put_page(struct page *page)
+{
+        kunmap(page);
+        page_cache_release(page);
+}
+
+static inline unsigned long dir_pages(struct inode *inode)
+{
+        return (inode->i_size + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
+}
+
 /* llite/namei.c */
 int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir);
 struct inode *ll_iget(struct super_block *sb, ino_t hash,
@@ -449,6 +496,9 @@ int ll_prepare_mdc_op_data(struct mdc_op_data *,
 struct lookup_intent *ll_convert_intent(struct open_intent *oit,
                                         int lookup_flags);
 #endif
+int lookup_it_finish(struct ptlrpc_request *request, int offset,
+                     struct lookup_intent *it, void *data);
+void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
 
 /* llite/rw.c */
 int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to);
@@ -711,4 +761,35 @@ ssize_t ll_getxattr(struct dentry *dentry, const char *name,
 ssize_t ll_listxattr(struct dentry *dentry, char *buffer, size_t size);
 int ll_removexattr(struct dentry *dentry, const char *name);
 
+/* statahead.c */
+
+#define LL_STATAHEAD_MIN  1
+#define LL_STATAHEAD_DEF  32
+#define LL_STATAHEAD_MAX  10000
+
+/* per inode struct, for dir only */
+struct ll_statahead_info {
+        struct inode           *sai_inode;
+        atomic_t                sai_refc;       /* when access this struct, hold
+                                                 * refcount */
+        unsigned int            sai_max;        /* max ahead of lookup */
+        unsigned int            sai_sent;       /* stat requests sent count */
+        unsigned int            sai_replied;    /* stat requests which received
+                                                 * reply */
+        unsigned int            sai_cached;     /* UPDATE lock cached locally
+                                                 * already */
+        unsigned int            sai_hit;        /* hit count */
+        unsigned int            sai_miss;       /* miss count */
+        unsigned int            sai_consecutive_miss; /* consecutive miss */
+        unsigned                sai_ls_all:1;   /* ls -al, do stat-ahead for
+                                                 * hidden entries */
+        struct ptlrpc_thread    sai_thread;     /* stat-ahead thread */
+        struct list_head        sai_entries;    /* stat-ahead entries */
+        unsigned int            sai_entries_nr; /* stat-ahead entries count */
+};
+
+int ll_statahead_enter(struct inode *dir, struct dentry **dentry, int lookup);
+void ll_statahead_exit(struct dentry *dentry, int result);
+void ll_stop_statahead(struct inode *inode);
+
 #endif /* LLITE_INTERNAL_H */
diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c
index 47f4cdf733a09c73ccfd93ac93124937c2e5dcda..4b49457b17a43a4d43ed92c9cb45e98ccbc867e5 100644
--- a/lustre/llite/llite_lib.c
+++ b/lustre/llite/llite_lib.c
@@ -95,6 +95,9 @@ static struct ll_sb_info *ll_init_sbi(void)
                 spin_lock_init(&sbi->ll_rw_extents_info.pp_extents[i].pp_w_hist.oh_lock);
         }
 
+        /* metadata statahead is enabled by default */
+        sbi->ll_sa_max = LL_STATAHEAD_DEF;
+
         RETURN(sbi);
 }
 
@@ -1125,6 +1128,12 @@ void ll_clear_inode(struct inode *inode)
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
                inode->i_generation, inode);
 
+        if (S_ISDIR(inode->i_mode)) {
+                /* these should have been cleared in ll_file_release */
+                LASSERT(lli->lli_sai == NULL);
+                LASSERT(lli->lli_opendir_pid == 0);
+        }
+
         ll_inode2fid(&fid, inode);
         clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags);
         mdc_change_cbdata(sbi->ll_mdc_exp, &fid, null_if_equal, inode);
diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c
index 6382a4d61c7f4087025a860ca07bdc792c1a3306..159036b9ecbbff193d7c267fa040a4ad90422f95 100644
--- a/lustre/llite/lproc_llite.c
+++ b/lustre/llite/lproc_llite.c
@@ -437,6 +437,59 @@ static int ll_wr_track_gid(struct file *file, const char *buffer,
         return (ll_wr_track_id(buffer, count, data, STATS_TRACK_GID));
 }
 
+static int ll_rd_statahead_count(char *page, char **start, off_t off,
+                                 int count, int *eof, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+        return snprintf(page, count, "%u\n", sbi->ll_sa_count);
+}
+
+static int ll_rd_statahead_max(char *page, char **start, off_t off,
+                               int count, int *eof, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+        return snprintf(page, count, "%u\n", sbi->ll_sa_max);
+}
+
+static int ll_wr_statahead_max(struct file *file, const char *buffer,
+                               unsigned long count, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+        int val, rc;
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+        if (rc)
+                return rc;
+        if (val >= 0 && val <= LL_STATAHEAD_MAX)
+                sbi->ll_sa_max = val;
+        else
+                CERROR("Bad statahead_max value %d. Valid values are in the "
+                       "range [0, %d]\n", val, LL_STATAHEAD_MAX);
+
+        return count;
+}
+
+static int ll_rd_statahead_stats(char *page, char **start, off_t off,
+                                 int count, int *eof, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+        return snprintf(page, count,
+                        "statahead wrong: %u\n"
+                        "statahead total: %u\n"
+                        "ls blocked:      %llu\n"
+                        "ls total:        %llu\n",
+                        sbi->ll_sa_wrong, sbi->ll_sa_total,
+                        sbi->ll_sa_blocked,
+                        sbi->ll_sa_blocked + sbi->ll_sa_cached);
+}
+
 static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "uuid",         ll_rd_sb_uuid,          0, 0 },
         //{ "mntpt_path",   ll_rd_path,             0, 0 },
@@ -458,6 +511,9 @@ static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "stats_track_pid",  ll_rd_track_pid, ll_wr_track_pid, 0 },
         { "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 },
         { "stats_track_gid",  ll_rd_track_gid, ll_wr_track_gid, 0 },
+        { "statahead_count", ll_rd_statahead_count, 0, 0 },
+        { "statahead_max",   ll_rd_statahead_max, ll_wr_statahead_max, 0 },
+        { "statahead_stats", ll_rd_statahead_stats, 0, 0 },
         { 0 }
 };
 
diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c
index 773f828f8d41e759d5d877d0fb79af90066027a2..52472bcfd851ef5aefad614743bc1245a0835104 100644
--- a/lustre/llite/namei.c
+++ b/lustre/llite/namei.c
@@ -373,7 +373,7 @@ static void ll_d_add(struct dentry *de, struct inode *inode)
  * in ll_revalidate_it.  After revaliadate inode will be have hashed aliases
  * and it triggers BUG_ON in d_instantiate_unique (bug #10954).
  */
-struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
+static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
 {
         struct list_head *tmp;
         struct dentry *dentry;
@@ -442,8 +442,8 @@ struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
         return de;
 }
 
-static int lookup_it_finish(struct ptlrpc_request *request, int offset,
-                            struct lookup_intent *it, void *data)
+int lookup_it_finish(struct ptlrpc_request *request, int offset,
+                     struct lookup_intent *it, void *data)
 {
         struct it_cb_data *icbd = data;
         struct dentry **de = icbd->icbd_childp;
@@ -530,8 +530,17 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
                         RETURN(ERR_PTR(rc));
         }
 
-        icbd.icbd_childp = &dentry;
+        if (it->it_op == IT_GETATTR) {
+                rc = ll_statahead_enter(parent, &dentry, 1);
+                if (rc >= 0) {
+                        ll_statahead_exit(dentry, rc);
+                        if (rc == 1)
+                                RETURN(retval = dentry);
+                }
+        }
+
         icbd.icbd_parent = parent;
+        icbd.icbd_childp = &dentry;
 
         rc = ll_prepare_mdc_op_data(&op_data, parent, NULL, dentry->d_name.name,
                                     dentry->d_name.len, lookup_flags, NULL);
@@ -540,9 +549,10 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
 
         it->it_create_mode &= ~current->fs->umask;
 
+        up(&parent->i_sem);
         rc = mdc_intent_lock(ll_i2mdcexp(parent), &op_data, NULL, 0, it,
                              lookup_flags, &req, ll_mdc_blocking_ast, 0);
-
+        down(&parent->i_sem);
         if (rc < 0)
                 GOTO(out, retval = ERR_PTR(rc));
 
diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c
new file mode 100644
index 0000000000000000000000000000000000000000..717661186dce9dbcedfbf16f73c520d22d93a019
--- /dev/null
+++ b/lustre/llite/statahead.c
@@ -0,0 +1,847 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (c) 2007 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <linux/fs.h>
+#include <linux/sched.h>
+#include <linux/mm.h>
+#include <linux/smp_lock.h>
+#include <linux/highmem.h>
+#include <linux/pagemap.h>
+
+#define DEBUG_SUBSYSTEM S_LLITE
+
+#include <obd_support.h>
+#include <lustre_lite.h>
+#include <lustre_dlm.h>
+#include <linux/lustre_version.h>
+#include "llite_internal.h"
+
+struct ll_sai_entry {
+        struct list_head        se_list;
+        int                     se_index;
+        int                     se_stat;
+};
+
+enum {
+        SA_ENTRY_UNSTATED = 0,
+        SA_ENTRY_STATED
+};
+
+static struct ll_statahead_info *ll_sai_alloc(void)
+{
+        struct ll_statahead_info *sai;
+
+        OBD_ALLOC_PTR(sai);
+        if (!sai)
+                return NULL;
+
+        sai->sai_max = LL_STATAHEAD_MIN;
+        init_waitqueue_head(&sai->sai_thread.t_ctl_waitq);
+        INIT_LIST_HEAD(&sai->sai_entries);
+        atomic_set(&sai->sai_refc, 1);
+        return sai;
+}
+
+static inline struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
+{
+        LASSERT(sai);
+        atomic_inc(&sai->sai_refc);
+        return sai;
+}
+
+static void ll_sai_put(struct ll_statahead_info *sai)
+{
+        struct inode *inode = sai->sai_inode;
+        struct ll_inode_info *lli = ll_i2info(inode);
+
+        if (atomic_dec_and_lock(&sai->sai_refc, &lli->lli_lock)) {
+                struct ll_sai_entry  *entry, *next;
+
+                LASSERT(sai->sai_thread.t_flags & SVC_STOPPED);
+                list_for_each_entry_safe(entry, next, &sai->sai_entries,
+                                         se_list) {
+                        list_del(&entry->se_list);
+                        OBD_FREE_PTR(entry);
+                }
+                OBD_FREE_PTR(sai);
+                lli->lli_sai = NULL;
+                spin_unlock(&lli->lli_lock);
+                iput(inode);
+        }
+}
+
+static struct ll_sai_entry *ll_sai_entry_get(struct ll_statahead_info *sai,
+                                             int index, int stat)
+{
+        struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+        struct ll_sb_info    *sbi = ll_i2sbi(sai->sai_inode);
+        struct ll_sai_entry  *entry;
+
+        OBD_ALLOC_PTR(entry);
+        if (entry == NULL)
+                return NULL;
+        
+        CDEBUG(D_READA, "sai entry %p index %d, stat %d\n", entry, index, stat);
+        entry->se_index = index;
+        entry->se_stat  = stat;
+
+        spin_lock(&lli->lli_lock);
+        list_add_tail(&entry->se_list, &sai->sai_entries);
+        sai->sai_entries_nr++;
+        sbi->ll_sa_count = sai->sai_entries_nr;
+        spin_unlock(&lli->lli_lock);
+
+        LASSERT(sai->sai_entries_nr <= sbi->ll_sa_max);
+        return entry;
+}
+
+static void ll_sai_entry_set(struct ll_statahead_info *sai, int index,
+                             int stat)
+{
+        struct ll_sai_entry *entry;
+
+        list_for_each_entry(entry, &sai->sai_entries, se_list) {
+                if (entry->se_index == index) {
+                        LASSERT(entry->se_stat == SA_ENTRY_UNSTATED);
+                        entry->se_stat = stat;
+                        CDEBUG(D_READA, "set sai entry %p index %d stat %d\n",
+                               entry, index, stat);
+                        return;
+                }
+        }
+        CERROR("can't find sai entry index %d\n", index);
+        LBUG();
+}
+
+/* check first entry was stated already */
+static int ll_sai_entry_stated(struct ll_statahead_info *sai)
+{
+        struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+        struct ll_sai_entry  *entry;
+        int                   rc = 0;
+
+        spin_lock(&lli->lli_lock);
+        if (!list_empty(&sai->sai_entries)) {
+                entry = list_entry(sai->sai_entries.next, struct ll_sai_entry,
+                                   se_list);
+                CDEBUG(D_READA, "sai entry %p index %d stat %d\n",
+                       entry, entry->se_index, entry->se_stat);
+                rc = (entry->se_stat != SA_ENTRY_UNSTATED);
+        }
+        spin_unlock(&lli->lli_lock);
+
+        return rc;
+}
+
+/* inside lli_lock */
+static void ll_sai_entry_put(struct ll_statahead_info *sai)
+{
+        struct ll_sai_entry  *entry;
+        
+        LASSERT(!list_empty(&sai->sai_entries));
+        LASSERT(sai->sai_entries_nr > 0);
+
+        entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list);
+        list_del(&entry->se_list);
+        sai->sai_entries_nr--;
+
+        CDEBUG(D_READA, "free sa entry %p index %d stat %d\n",
+               entry, entry->se_index, entry->se_stat);
+        OBD_FREE_PTR(entry);
+}
+
+/* finish lookup/revalidate */
+static int ll_statahead_interpret(struct obd_export *exp,
+                                  struct ptlrpc_request *req,
+                                  struct md_enqueue_info *minfo,
+                                  int rc)
+{
+        struct lookup_intent     *it = &minfo->mi_it;
+        struct dentry            *dentry = minfo->mi_dentry;
+        struct inode             *dir = dentry->d_parent->d_inode;
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct ll_statahead_info *sai;
+        ENTRY;
+
+        CDEBUG(D_READA, "statahead %.*s rc %d\n",
+               dentry->d_name.len, dentry->d_name.name, rc);
+        if (rc)
+                GOTO(out, rc);
+
+        if (dentry->d_inode == NULL) {
+                /* lookup */
+                struct dentry    *save = dentry;
+                struct it_cb_data icbd = {
+                        .icbd_parent = dir,
+                        .icbd_childp = &dentry
+                };
+
+                down(&dir->i_sem);
+                rc = lookup_it_finish(req, DLM_REPLY_REC_OFF, it, &icbd);
+                if (!rc) {
+                        LASSERT(dentry->d_inode);
+                        if (dentry != save)
+                                dput(save);
+                        ll_lookup_finish_locks(it, dentry);
+                }
+                up(&dir->i_sem);
+        } else {
+                /* revalidate */
+                struct mds_body *body;
+
+                body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
+                                      sizeof(*body));
+                if (memcmp(&minfo->mi_data.fid2, &body->fid1,
+                           sizeof(body->fid1))) {
+                        ll_unhash_aliases(dentry->d_inode);
+                        GOTO(out, rc = -EAGAIN);
+                }
+
+                rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, dentry);
+                if (rc) {
+                        ll_unhash_aliases(dentry->d_inode);
+                        GOTO(out, rc);
+                }
+
+                spin_lock(&dcache_lock);
+                lock_dentry(dentry);
+                __d_drop(dentry);
+                dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
+                unlock_dentry(dentry);
+                __d_rehash(dentry, 0);
+                spin_unlock(&dcache_lock);
+
+                ll_lookup_finish_locks(it, dentry);
+
+        }
+        EXIT;
+out:
+        spin_lock(&lli->lli_lock);
+        sai = lli->lli_sai;
+        if (sai) {
+                lli->lli_sai->sai_replied++;
+                ll_sai_entry_set(lli->lli_sai, (int)minfo->mi_cbdata,
+                                 SA_ENTRY_STATED);
+                wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq);
+        }
+        spin_unlock(&lli->lli_lock);
+        ll_intent_release(it);
+        OBD_FREE_PTR(minfo);
+
+        dput(dentry);
+        return rc;
+}
+
+static void sa_args_fini(struct md_enqueue_info *minfo,
+                         struct ldlm_enqueue_info *einfo)
+{
+        LASSERT(minfo && einfo);
+        OBD_FREE_PTR(minfo);
+        OBD_FREE_PTR(einfo);
+}
+
+static int sa_args_prep(struct inode *dir, struct dentry *dentry,
+                        struct md_enqueue_info **pmi,
+                        struct ldlm_enqueue_info **pei)
+{
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct md_enqueue_info   *minfo;
+        struct ldlm_enqueue_info *einfo;
+
+        OBD_ALLOC_PTR(einfo);
+        if (einfo == NULL)
+                return -ENOMEM;
+
+        OBD_ALLOC_PTR(minfo);
+        if (minfo == NULL) {
+                OBD_FREE_PTR(einfo);
+                return -ENOMEM;
+        }
+
+        minfo->mi_exp = ll_i2mdcexp(dir);
+        intent_init(&minfo->mi_it, IT_GETATTR);
+        minfo->mi_dentry = dentry;
+        minfo->mi_cb = ll_statahead_interpret;
+        minfo->mi_cbdata = (void *)lli->lli_sai->sai_sent;
+
+        einfo->ei_type   = LDLM_IBITS;
+        einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
+        einfo->ei_cb_bl  = ll_mdc_blocking_ast;
+        einfo->ei_cb_cp  = ldlm_completion_ast;
+        einfo->ei_cb_gl  = NULL;
+        einfo->ei_cbdata = NULL;
+
+        *pmi = minfo;
+        *pei = einfo;
+
+        return 0;
+}
+
+/* similar to ll_lookup_it(). */
+static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
+{
+        struct md_enqueue_info   *minfo;
+        struct ldlm_enqueue_info *einfo;
+        int                       rc;                
+        ENTRY;
+
+        rc = sa_args_prep(dir, dentry, &minfo, &einfo);
+        if (rc)
+                RETURN(rc);
+
+        rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir, NULL,
+                                    dentry->d_name.name, dentry->d_name.len, 0,
+                                    NULL);
+        if (rc == 0)
+                rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo);
+
+        if (rc)
+                sa_args_fini(minfo, einfo);
+
+        RETURN(rc);
+}
+
+/* similar to ll_revalidate_it().
+ * return 1: dentry valid.
+ *        0: will send stat-ahead request.
+ *        -errno: prepare stat-ahead request failed. */
+static int do_sa_revalidate(struct dentry *dentry)
+{
+        struct inode             *inode = dentry->d_inode;
+        struct ll_inode_info     *lli = ll_i2info(dentry->d_parent->d_inode);
+        struct ll_fid             fid;
+        struct lookup_intent      it;
+        struct md_enqueue_info   *minfo;
+        struct ldlm_enqueue_info *einfo;
+        int rc;
+        ENTRY;
+
+        if (inode == NULL)
+                RETURN(1);
+
+        if (d_mountpoint(dentry))
+                RETURN(1);
+
+        ll_inode2fid(&fid, inode);
+
+        intent_init(&it, IT_GETATTR);
+        rc = mdc_revalidate_lock(ll_i2mdcexp(inode), &it, &fid);
+        if (rc == 1) {
+                ll_intent_release(&it);
+                lli->lli_sai->sai_cached++;
+                wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq);
+                RETURN(1);
+        }
+
+        rc = sa_args_prep(dentry->d_parent->d_inode, dentry, &minfo, &einfo);
+        if (rc)
+                RETURN(rc);
+
+        rc = ll_prepare_mdc_op_data(&minfo->mi_data, dentry->d_parent->d_inode,
+                                    inode, dentry->d_name.name,
+                                    dentry->d_name.len, 0, NULL);
+        if (rc == 0)
+                rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo);
+
+        if (rc)
+                sa_args_fini(minfo, einfo);
+
+        RETURN(rc);
+}
+
+/* copied from kernel */
+static inline void name2qstr(struct qstr *this, const char *name, int namelen)
+{
+        unsigned long        hash;
+        const unsigned char *p = (const unsigned char *)name;
+        int                  len;
+        unsigned int         c;
+
+        hash = init_name_hash();
+        for (len = 0; len < namelen; len++, p++) {
+                c = *p;
+                hash = partial_name_hash(c, hash);
+        }
+        this->name = name;
+        this->len  = namelen;
+        this->hash = end_name_hash(hash);
+}
+
+static int ll_statahead_one(struct dentry *parent, ext2_dirent *de)
+{
+        struct inode           *dir = parent->d_inode;
+        struct ll_inode_info   *lli = ll_i2info(dir);
+        struct qstr             name;
+        struct dentry          *dentry;
+        struct ll_sai_entry    *se;
+        int                     rc;
+        ENTRY;
+
+        name2qstr(&name, de->name, de->name_len);
+
+        se = ll_sai_entry_get(lli->lli_sai, lli->lli_sai->sai_sent,
+                              SA_ENTRY_UNSTATED);
+
+        down(&dir->i_sem);
+        if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
+                CDEBUG(D_READA, "parent dentry@%p %.*s is "
+                       "DCACHE_LUSTRE_INVALID, skip statahead\n",
+                       parent, parent->d_name.len, parent->d_name.name);
+                up(&dir->i_sem);
+                GOTO(out, rc = -EINVAL);
+        }
+
+        dentry = d_lookup(parent, &name);
+        if (!dentry) {
+                struct dentry *dentry = d_alloc(parent, &name);
+
+                up(&dir->i_sem);
+                rc = -ENOMEM;
+                if (dentry) {
+                        rc = do_sa_lookup(dir, dentry);
+                        if (rc)
+                                dput(dentry);
+                }
+                GOTO(out, rc);
+        }
+        up(&dir->i_sem);
+
+        rc = do_sa_revalidate(dentry);
+        if (rc)
+                dput(dentry);
+        GOTO(out, rc);
+out:
+        if (rc) {
+                CDEBUG(D_READA, "set sai entry %p index %d stat %d\n",
+                       se, se->se_index, se->se_stat);
+                se->se_stat = rc;
+                wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq);
+        }
+        lli->lli_sai->sai_sent++;
+        return rc;
+}
+                
+static inline int sa_check_stop(struct ll_statahead_info *sai)
+{
+        return !!(sai->sai_thread.t_flags & SVC_STOPPING);
+}
+
+static inline int sa_not_full(struct ll_statahead_info *sai)
+{
+        return sai->sai_sent - sai->sai_miss - sai->sai_hit < sai->sai_max;
+}
+
+struct ll_sa_thread_args {
+        struct dentry   *sta_parent;
+        pid_t            sta_pid;
+};
+
+static int ll_statahead_thread(void *arg)
+{
+        struct ll_sa_thread_args *sta = arg;
+        struct dentry            *parent = dget(sta->sta_parent);
+        struct inode             *dir = parent->d_inode;
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct ll_sb_info        *sbi = ll_i2sbi(dir);
+        struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
+        struct ptlrpc_thread     *thread = &sai->sai_thread;
+        struct l_wait_info        lwi = { 0 };
+        unsigned long             index = 0;
+        __u64                     offset = 0;
+        int                       skip = 0;
+        int                       rc = 0;
+        char                      name[16] = "";
+        ENTRY;
+
+        sbi->ll_sa_total++;
+
+        snprintf(name, 15, "ll_sa_%u", sta->sta_pid);
+        cfs_daemonize(name);
+        thread->t_flags = SVC_RUNNING;
+        wake_up(&thread->t_ctl_waitq);
+        CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
+
+        if (sai->sai_ls_all)
+                CDEBUG(D_READA, "do statahead for hidden files\n");
+
+        while (1) {
+                unsigned long npages = dir_pages(dir);
+
+                /* hit ratio < 80% */
+                if ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
+                     (sai->sai_consecutive_miss > 8)) {
+                        sbi->ll_sa_wrong++;
+                        CDEBUG(D_READA, "statahead for dir %.*s hit ratio too "
+                               "low: hit/miss %u/%u, sent/replied %u/%u, "
+                               "cached %u\n",
+                               parent->d_name.len, parent->d_name.name,
+                               sai->sai_hit, sai->sai_miss, sai->sai_sent,
+                               sai->sai_replied, sai->sai_cached);
+                        break;
+                }
+
+                /* reach the end of dir */
+                if (index == npages) {
+                        CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
+                               index, npages);
+                        break;
+                }
+
+                l_wait_event(thread->t_ctl_waitq,
+                             sa_check_stop(sai) || sa_not_full(sai),
+                             &lwi);
+
+                if (sa_check_stop(sai))
+                        break;
+
+                for (; index < npages; index++, offset = 0) {
+                        char *kaddr, *limit;
+                        ext2_dirent *de;
+                        struct page *page;
+
+                        CDEBUG(D_EXT2,"read %lu of dir %lu/%u page %lu"
+                               "/%lu size %llu\n",
+                               CFS_PAGE_SIZE, dir->i_ino, dir->i_generation,
+                               index, npages, dir->i_size);
+
+                        page = ll_get_dir_page(dir, index);
+                        npages = dir_pages(dir);
+
+                        if (IS_ERR(page)) {
+                                rc = PTR_ERR(page);
+                                CERROR("error reading dir %lu/%u page %lu: "
+                                       "rc %d\n",
+                                       dir->i_ino, dir->i_generation, index,
+                                       rc);
+                                GOTO(out, rc);
+                        }
+
+                        kaddr = page_address(page);
+                        de = (ext2_dirent *)(kaddr + offset);
+                        limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
+                        for (; (char*)de <= limit && sa_not_full(sai);
+                             de = ext2_next_entry(de)) {
+                                if (!de->inode)
+                                        continue;
+
+                                /* don't stat-ahead ".", ".." */
+                                if (skip < 2) {
+                                        skip++;
+                                        continue;
+                                }
+
+                                /* don't stat-ahead for hidden files */
+                                if (de->name[0] == '.' && !sai->sai_ls_all)
+                                        continue;
+
+                                /* don't stat-ahead for the first de */
+                                if (skip < 3) {
+                                        skip++;
+                                        continue;
+                                }
+
+                                rc = ll_statahead_one(parent, de);
+                                if (rc < 0) {
+                                        ext2_put_page(page);
+                                        GOTO(out, rc);
+                                }
+                        }
+                        offset = (char *)de - kaddr;
+                        ext2_put_page(page);
+
+                        if ((char *)de <= limit)
+                                /* !sa_not_full() */
+                                break;
+                }
+        }
+        EXIT;
+out:
+        thread->t_flags = SVC_STOPPED;
+        wake_up(&thread->t_ctl_waitq);
+        lli->lli_opendir_pid = 0; /* avoid statahead again */
+        ll_sai_put(sai);
+        dput(parent);
+        return 0;
+}
+
+/* called in ll_file_release */
+void ll_stop_statahead(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ptlrpc_thread *thread;
+
+        /* don't check pid here. upon fork, if parent closedir before child,
+         * child will not have chance to stop this thread. */
+        lli->lli_opendir_pid = 0;
+
+        spin_lock(&lli->lli_lock);
+        if (lli->lli_sai) {
+                ll_sai_get(lli->lli_sai);
+                spin_unlock(&lli->lli_lock);
+
+                thread = &lli->lli_sai->sai_thread;
+                thread->t_flags = SVC_STOPPING;
+                wake_up(&thread->t_ctl_waitq);
+                wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
+                ll_sai_put(lli->lli_sai);
+
+                CDEBUG(D_READA, "stop statahead thread, pid %d\n",
+                       current->pid);
+                return;
+        }
+        spin_unlock(&lli->lli_lock);
+}
+
+enum {
+        LS_NONE_FIRST_DE = 0,   /* not first dirent, or is "." */
+        LS_FIRST_DE,            /* the first non-hidden dirent */
+        LS_FIRST_DOT_DE         /* the first hidden dirent, that is ".xxx" */
+};
+
+static int is_first_dirent(struct inode *dir, struct dentry *dentry)
+{
+        struct qstr   *d_name = &dentry->d_name;
+        unsigned long  npages = dir_pages(dir);
+        struct page   *page;
+        ext2_dirent   *de;
+        unsigned long  index;
+        __u64          offset = 0;
+        char          *kaddr, *limit;
+        int            dot_de = 1; /* dirent is dotfile till now */
+        int            rc = LS_NONE_FIRST_DE;
+        ENTRY;
+
+        page = ll_get_dir_page(dir, 0);
+        if (IS_ERR(page)) {
+                CERROR("error reading dir %lu/%u page 0: rc %ld\n",
+                       dir->i_ino, dir->i_generation, PTR_ERR(page));
+                RETURN(LS_NONE_FIRST_DE);
+        }
+
+        kaddr = page_address(page);
+        de = (ext2_dirent *)kaddr;
+        if (!(de->name_len == 1 && strncmp(de->name, ".", 1) == 0))
+                CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino);
+        de = ext2_next_entry(de); /* skip ".", or ingore bad entry */
+        if (!(de->name_len == 2 && strncmp(de->name, "..", 2) == 0))
+                CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino);
+        de = ext2_next_entry(de); /* skip "..", or ingore bad entry */
+
+        offset = (char *)de - kaddr;
+
+        for (index = 0; index < npages; offset = 0) {
+                de = (ext2_dirent *)(kaddr + offset);
+                limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
+                for (; (char*)de <= limit; de = ext2_next_entry(de)) {
+                        if (!de->inode)
+                                continue;
+
+                        if (de->name[0] != '.')
+                                dot_de = 0;
+
+                        if (dot_de && d_name->name[0] != '.') {
+                                CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
+                                       d_name->len, d_name->name,
+                                       de->name_len, de->name);
+                                continue;
+                        }
+
+                        if (d_name->len == de->name_len &&
+                            !strncmp(d_name->name, de->name, d_name->len))
+                                rc = LS_FIRST_DE + dot_de;
+                        else
+                                rc = LS_NONE_FIRST_DE;
+                        GOTO(out, rc);
+                }
+
+                if (++index >= npages)
+                        break;
+
+                ext2_put_page(page);
+
+                page = ll_get_dir_page(dir, index);
+                if (IS_ERR(page)) {
+                        CERROR("error reading dir %lu/%u page %lu: rc %ld\n",
+                               dir->i_ino, dir->i_generation, index,
+                               PTR_ERR(page));
+                        RETURN(LS_NONE_FIRST_DE);
+                }
+                kaddr = page_address(page);
+        }
+        CERROR("%.*s not found in dir %.*s!\n", d_name->len, d_name->name,
+               dentry->d_parent->d_name.len, dentry->d_parent->d_name.name);
+        EXIT;
+out:
+        ext2_put_page(page);
+        return rc;
+}
+
+/* start stat-ahead thread if this is the first dir entry, otherwise if a thread
+ * is started already, wait until thread is ahead of me.
+ * Return value: 
+ *    0 -- miss,
+ *    1 -- hit,
+ *    -EEXIST -- stat ahead thread started, and this is the first try.
+ *    other negative value -- error.
+ */
+int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
+{
+        struct ll_sb_info        *sbi = ll_i2sbi(dir);
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct ll_statahead_info *sai;
+        struct ll_sa_thread_args  sta;
+        int                       rc;
+        ENTRY;
+
+        if (sbi->ll_sa_max == 0)
+                RETURN(-ENOTSUPP);
+
+        /* not the same process, don't statahead */
+        if (lli->lli_opendir_pid != current->pid)
+                RETURN(-EBADF);
+
+        spin_lock(&lli->lli_lock);
+        if (lli->lli_sai) {
+                sai = ll_sai_get(lli->lli_sai);
+                spin_unlock(&lli->lli_lock);
+
+                if (ll_sai_entry_stated(sai)) {
+                        sbi->ll_sa_cached++;
+                } else {
+                        struct l_wait_info lwi = { 0 };
+
+                        sbi->ll_sa_blocked++;
+                        up(&dir->i_sem);
+                        /* thread started already, avoid double-stat */
+                        l_wait_event(sai->sai_thread.t_ctl_waitq,
+                                     ll_sai_entry_stated(sai) ||
+                                     sai->sai_thread.t_flags & SVC_STOPPED,
+                                     &lwi);
+                        down(&dir->i_sem);
+                }
+
+                ll_sai_put(sai);
+
+                if (lookup) {
+                        struct dentry *result;
+
+                        result = d_lookup((*dentryp)->d_parent,
+                                          &(*dentryp)->d_name);
+                        if (result) {
+                                LASSERT(result != *dentryp);
+                                dput(*dentryp);
+                                *dentryp = result;
+                        }
+                        RETURN(result != NULL);
+                }
+                /* do nothing for revalidate */
+                RETURN(0);
+        }
+        spin_unlock(&lli->lli_lock);
+
+        rc = is_first_dirent(dir, *dentryp);
+        if (!rc) {
+                /* optimization: don't statahead for this pid any longer */
+                spin_lock(&lli->lli_lock);
+                if (lli->lli_sai == NULL)
+                        lli->lli_opendir_pid = 0;
+                spin_unlock(&lli->lli_lock);
+                RETURN(-EBADF);
+        }
+
+        spin_lock(&lli->lli_lock);
+        if (lli->lli_sai == NULL) {
+                lli->lli_sai = ll_sai_alloc();
+                if (lli->lli_sai == NULL) {
+                        spin_unlock(&lli->lli_lock);
+                        RETURN(-ENOMEM);
+                }
+        } else {
+                /* sai is already there */
+                spin_unlock(&lli->lli_lock);
+                RETURN(-EBUSY);
+        }
+        spin_unlock(&lli->lli_lock);
+        
+        sai = lli->lli_sai;
+        sai->sai_inode = igrab(dir);
+        sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
+
+        sta.sta_parent = (*dentryp)->d_parent;
+        sta.sta_pid    = current->pid;
+        rc = kernel_thread(ll_statahead_thread, &sta, 0);
+        if (rc < 0) {
+                CERROR("can't start ll_sa thread, rc: %d\n", rc);
+                ll_sai_put(sai);
+                RETURN(rc);
+        }
+
+        wait_event(sai->sai_thread.t_ctl_waitq, 
+                   sai->sai_thread.t_flags & (SVC_RUNNING | SVC_STOPPED));
+        ll_sai_put(sai);
+
+        /* we don't stat-ahead for the first dirent since we are already in
+         * lookup, and -EEXIST also indicates that this is the first dirent.
+         */
+        RETURN(-EEXIST);
+}
+
+/* update hit/miss count */
+void ll_statahead_exit(struct dentry *dentry, int result)
+{
+        struct ll_inode_info *lli = ll_i2info(dentry->d_parent->d_inode);
+        struct ll_sb_info    *sbi = ll_i2sbi(dentry->d_parent->d_inode);
+
+        if (lli->lli_opendir_pid != current->pid)
+                return;
+
+        spin_lock(&lli->lli_lock);
+        if (lli->lli_sai) {
+                struct ll_statahead_info *sai = lli->lli_sai;
+
+                ll_sai_entry_put(sai);
+                if (result == 1) {
+                        sai->sai_hit++;
+                        sai->sai_consecutive_miss = 0;
+                        sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
+                        CDEBUG(D_READA, "statahead %.*s hit (hit/miss %u/%u)\n",
+                               dentry->d_name.len, dentry->d_name.name,
+                               sai->sai_hit, sai->sai_miss);
+                } else {
+                        sai->sai_miss++;
+                        sai->sai_consecutive_miss++;
+                        /* upon miss, it's always because some dentry is added
+                         * by statahead thread, and at the mean time `ls`
+                         * processs finds this dentry, but the d_op for this
+                         * dentry is NULL, then revalidate is not done, and
+                         * ll_statahead_exit() not called for this dentry,
+                         * so statahead thread should be behind of `ls` process,
+                         * put one entry to go ahead.
+                         */
+                        ll_sai_entry_put(sai);
+                        CDEBUG(D_READA, "statahead %.*s miss (hit/miss %u/%u)\n",
+                               dentry->d_name.len, dentry->d_name.name,
+                               sai->sai_hit, sai->sai_miss);
+                }
+                wake_up(&sai->sai_thread.t_ctl_waitq);
+        }
+        spin_unlock(&lli->lli_lock);
+}
diff --git a/lustre/llite/xattr.c b/lustre/llite/xattr.c
index d123ba144504c0193f35854f14b8e5cb891ebf1b..5834f02b81d3d67fac4401df279ae11c69466e2b 100644
--- a/lustre/llite/xattr.c
+++ b/lustre/llite/xattr.c
@@ -239,6 +239,8 @@ int ll_getxattr_common(struct inode *inode, const char *name,
                 posix_acl_release(acl);
                 RETURN(rc);
         }
+        if (xattr_type == XATTR_ACL_DEFAULT_T && !S_ISDIR(inode->i_mode))
+                RETURN(-ENODATA);
 #endif
 
 do_getxattr:
diff --git a/lustre/mdc/mdc_lib.c b/lustre/mdc/mdc_lib.c
index 8b7c6afa15c7df2230112165b1d5bcb27abe1b0c..89d1f0d54f444ba26a3c6ce7d6208e9624cf30f2 100644
--- a/lustre/mdc/mdc_lib.c
+++ b/lustre/mdc/mdc_lib.c
@@ -315,7 +315,8 @@ void mdc_getattr_pack(struct ptlrpc_request *req, int offset, int valid,
                 char *tmp;
                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1,
                                      data->namelen + 1);
-                LOGL0(data->name, data->namelen, tmp);
+                memcpy(tmp, data->name, data->namelen);
+                data->name = tmp;
         }
 }
 
@@ -396,8 +397,8 @@ void mdc_exit_request(struct client_obd *cli)
 
         spin_lock(&cli->cl_loi_list_lock);
         cli->cl_r_in_flight--;
+
         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
-                
                 if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
                         /* No free request slots anymore */
                         break;
diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c
index 7962b0ae4aab164e7d68b7b3561c858d26ab1532..3cf911a78aad2f68b0fa5c23b91e3efda50c0dcf 100644
--- a/lustre/mdc/mdc_locks.c
+++ b/lustre/mdc/mdc_locks.c
@@ -59,18 +59,6 @@ void it_clear_disposition(struct lookup_intent *it, int flag)
 }
 EXPORT_SYMBOL(it_clear_disposition);
 
-static int it_to_lock_mode(struct lookup_intent *it)
-{
-        /* CREAT needs to be tested before open (both could be set) */
-        if (it->it_op & IT_CREAT)
-                return LCK_CW;
-        else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
-                return LCK_CR;
-
-        LBUG();
-        RETURN(-EINVAL);
-}
-
 int it_open_error(int phase, struct lookup_intent *it)
 {
         if (it_disposition(it, DISP_OPEN_OPEN)) {
@@ -195,25 +183,28 @@ static int round_up(int val)
  * but this is incredibly unlikely, and questionable whether the client
  * could do MDS recovery under OOM anyways... */
 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
-                                struct mds_body *body, int size[6])
+                                struct mds_body *body)
 {
-        int new_size, old_size;
+        int old_len, new_size, old_size;
+        struct lustre_msg *old_msg = req->rq_reqmsg;
         struct lustre_msg *new_msg;
 
+        old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
         /* save old size */
-        old_size = lustre_msg_size(lustre_request_magic(req), 6, size);
-
-        size[DLM_INTENT_REC_OFF + 2] = body->eadatasize;
-        new_size = lustre_msg_size(lustre_request_magic(req), 6, size);
+        old_size = lustre_msg_size(lustre_request_magic(req),
+                                   req->rq_reqmsg->lm_bufcount,
+                                   req->rq_reqmsg->lm_buflens);
+
+        lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
+                              body->eadatasize);
+        new_size = lustre_msg_size(lustre_request_magic(req),
+                                   req->rq_reqmsg->lm_bufcount,
+                                   req->rq_reqmsg->lm_buflens);
         OBD_ALLOC(new_msg, new_size);
         if (new_msg != NULL) {
-                struct lustre_msg *old_msg = req->rq_reqmsg;
-
-                DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
+                DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
                           body->eadatasize);
                 memcpy(new_msg, old_msg, old_size);
-                lustre_msg_set_buflen(new_msg, DLM_INTENT_REC_OFF + 2,
-                                      body->eadatasize);
 
                 spin_lock(&req->rq_lock);
                 req->rq_reqmsg = new_msg;
@@ -222,107 +213,85 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req,
 
                 OBD_FREE(old_msg, old_size);
         } else {
+                lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
                 body->valid &= ~OBD_MD_FLEASIZE;
                 body->eadatasize = 0;
         }
 }
 
-/* We always reserve enough space in the reply packet for a stripe MD, because
- * we don't know in advance the file type. */
-int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
-                struct lookup_intent *it, struct mdc_op_data *op_data,
-                struct lustre_handle *lockh, void *lmm, int lmmsize,
-                int extra_lock_flags)
+static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
+                                                   struct lookup_intent *it,
+                                                   struct mdc_op_data *data,
+                                                   void *lmm, int lmmsize)
 {
         struct ptlrpc_request *req;
-        struct obd_device *obddev = class_exp2obd(exp);
-        struct ldlm_res_id res_id =
-                { .name = {op_data->fid1.id, op_data->fid1.generation} };
-        ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
-        struct ldlm_request *lockreq;
         struct ldlm_intent *lit;
-        struct ldlm_reply *lockrep;
+        struct obd_device *obddev = class_exp2obd(exp);
         int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREQ_OFF]     = sizeof(*lockreq),
+                        [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
-                        0, 0, 0, 0 };
+                        [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_create),
+                        [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
+                        /* As an optimization, we allocate an RPC request buffer
+                         * for at least a default-sized LOV EA even if we aren't
+                         * sending one.  We grow the whole request to the next
+                         * power-of-two size since we get that much from a slab
+                         * allocation anyways. This avoids an allocation below
+                         * in the common case where we need to save a
+                         * default-sized LOV EA for open replay. */
+                        [DLM_INTENT_REC_OFF+2]= max(lmmsize,
+                                         obddev->u.cli.cl_default_mds_easize) };
         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                           [DLM_LOCKREPLY_OFF]   = sizeof(*lockrep),
+                           [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
-                                                   cl_max_mds_easize, 0 };
-        int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
-        int repbufcnt = 4, rc;
-        void *eadata;
-        ENTRY;
-
-        LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
-//        LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
-//                          ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
-
-        if (it->it_op & IT_OPEN) {
-                CFS_LIST_HEAD(cancels);
-                int count = 0;
-                int mode;
-                
-                it->it_create_mode |= S_IFREG;
-
-                size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create);
-                size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
-                /* As an optimization, we allocate an RPC request buffer for
-                 * at least a default-sized LOV EA even if we aren't sending
-                 * one.  We grow the whole request to the next power-of-two
-                 * size since we get that much from a slab allocation anyways.
-                 * This avoids an allocation below in the common case where
-                 * we need to save a default-sized LOV EA for open replay. */
-                size[DLM_INTENT_REC_OFF + 2] = max(lmmsize,
-                                          obddev->u.cli.cl_default_mds_easize);
-                rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6,
-                                     size);
-                if (rc & (rc - 1))
-                        size[DLM_INTENT_REC_OFF + 2] =
-                                 min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc,
-                                     obddev->u.cli.cl_max_mds_easize);
-
-                /* If inode is known, cancel conflicting OPEN locks. */
-                if (op_data->fid2.id) {
-                        if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
-                                mode = LCK_CW;
+                                                        cl_max_mds_easize,
+                           [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
+        CFS_LIST_HEAD(cancels);
+        int count = 0;
+        int mode;
+        int rc;
+
+        it->it_create_mode |= S_IFREG;
+
+        rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, size);
+        if (rc & (rc - 1))
+                size[DLM_INTENT_REC_OFF + 2] =
+                         min(size[DLM_INTENT_REC_OFF + 2] + round_up(rc) - rc,
+                             obddev->u.cli.cl_max_mds_easize);
+
+        /* If inode is known, cancel conflicting OPEN locks. */
+        if (data->fid2.id) {
+                if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
+                        mode = LCK_CW;
 #ifdef FMODE_EXEC
-                        else if (it->it_flags & FMODE_EXEC)
-                                mode = LCK_PR;
+                else if (it->it_flags & FMODE_EXEC)
+                        mode = LCK_PR;
 #endif
-                        else 
-                                mode = LCK_CR;
-                        count = mdc_resource_get_unused(exp, &op_data->fid2,
-                                                        &cancels, mode,
-                                                        MDS_INODELOCK_OPEN);
-                }
-
-                /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
-                if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
-                        mode = LCK_EX;
-                else
+                else 
                         mode = LCK_CR;
-                count += mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
-                                                 mode, MDS_INODELOCK_UPDATE);
-                if (it->it_flags & O_JOIN_FILE) {
-                        /* join is like an unlink of the tail */
-                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                        size[DLM_INTENT_REC_OFF + 3] =
-                                                 sizeof(struct mds_rec_join);
-                        req = ldlm_prep_enqueue_req(exp, 7, size, &cancels,
-                                                    count);
-                        mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, op_data,
-                                      (*(__u64 *)op_data->data));
-                } else {
-                        req = ldlm_prep_enqueue_req(exp, 6, size, &cancels,
-                                                    count);
-                }
+                count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
+                                                mode, MDS_INODELOCK_OPEN);
+        }
 
-                if (!req)
-                        RETURN(-ENOMEM);
+        /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
+        if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
+                mode = LCK_EX;
+        else
+                mode = LCK_CR;
+        count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
+                                         MDS_INODELOCK_UPDATE);
+        if (it->it_flags & O_JOIN_FILE) {
+                __u64 head_size = (*(__u64 *)data->data);
+                /* join is like an unlink of the tail */
+                size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join);
+                req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, count);
+                mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data, head_size);
+        } else {
+                req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, count);
+        }
 
+        if (req) {
                 spin_lock(&req->rq_lock);
                 req->rq_replay = 1;
                 spin_unlock(&req->rq_lock);
@@ -333,76 +302,110 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 lit->opc = (__u64)it->it_op;
 
                 /* pack the intended request */
-                mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data,
+                mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
                               it->it_create_mode, 0, it->it_flags,
                               lmm, lmmsize);
 
-                repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
-        } else if (it->it_op & IT_UNLINK) {
-                size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink);
-                size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
-                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
-                if (!req)
-                        RETURN(-ENOMEM);
+                ptlrpc_req_set_repsize(req, 5, repsize);
+        }
+        return req;
+}
 
+static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
+                                                     struct lookup_intent *it,
+                                                     struct mdc_op_data *data)
+{
+        struct ptlrpc_request *req;
+        struct ldlm_intent *lit;
+        struct obd_device *obddev = class_exp2obd(exp);
+        int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+                        [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
+                        [DLM_INTENT_IT_OFF]   = sizeof(*lit),
+                        [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_unlink),
+                        [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
+        int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+                           [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
+                           [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
+                           [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
+                                                        cl_max_mds_easize,
+                           [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
+                                                        cl_max_mds_cookiesize };
+
+        req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
+        if (req) {
                 /* pack the intent */
                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
                                      sizeof(*lit));
                 lit->opc = (__u64)it->it_op;
 
                 /* pack the intended request */
-                mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data);
-
-                repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
-        } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
-                obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
-                                  OBD_MD_FLACL | OBD_MD_FLMODEASIZE |
-                                  OBD_MD_FLDIREA;
-                size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body);
-                size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
+                mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
 
-                if (it->it_op & IT_GETATTR)
-                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                ptlrpc_req_set_repsize(req, 5, repsize);
+        }
+        return req;
+}
 
-                req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
-                if (!req)
-                        RETURN(-ENOMEM);
+static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
+                                                     struct lookup_intent *it,
+                                                     struct mdc_op_data *data)
+{
+        struct ptlrpc_request *req;
+        struct ldlm_intent *lit;
+        struct obd_device *obddev = class_exp2obd(exp);
+        int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+                        [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
+                        [DLM_INTENT_IT_OFF]   = sizeof(*lit),
+                        [DLM_INTENT_REC_OFF]  = sizeof(struct mds_body),
+                        [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
+        int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+                           [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
+                           [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
+                           [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
+                                                        cl_max_mds_easize,
+                           [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
+        obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
+                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
 
+        req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
+        if (req) {
                 /* pack the intent */
                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
                                      sizeof(*lit));
                 lit->opc = (__u64)it->it_op;
 
                 /* pack the intended request */
-                mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid,
-                                 it->it_flags, op_data);
-
-                repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
-        } else if (it->it_op == IT_READDIR) {
-                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
-                if (!req)
-                        RETURN(-ENOMEM);
-
-                repbufcnt = 2;
-        } else {
-                LBUG();
-                RETURN(-EINVAL);
+                mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
+                                 data);
+                ptlrpc_req_set_repsize(req, 5, repsize);
         }
+        return req;
+}
 
-        /* get ready for the reply */
-        ptlrpc_req_set_repsize(req, repbufcnt, repsize);
+static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
+{
+        struct ptlrpc_request *req;
+        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+                        [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
+        int repsize[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+                           [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply) };
+
+        req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
+        if (req)
+                ptlrpc_req_set_repsize(req, 2, repsize);
+        return req;
+}
 
-         /* It is important to obtain rpc_lock first (if applicable), so that
-          * threads that are serialised with rpc_lock are not polluting our
-          * rpcs in flight counter */
-        mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
-        mdc_enter_request(&obddev->u.cli);
-        rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
-                              0, NULL, lockh, 0);
-        mdc_exit_request(&obddev->u.cli);
-        mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+static int mdc_finish_enqueue(struct obd_export *exp,
+                              struct ptlrpc_request *req,
+                              struct ldlm_enqueue_info *einfo,
+                              struct lookup_intent *it,
+                              struct lustre_handle *lockh,
+                              int rc)
+{
+        struct ldlm_request *lockreq;
+        struct ldlm_reply *lockrep;
+        ENTRY;
 
         /* Similarly, if we're going to replay this request, we don't want to
          * actually get a lock, just perform the intent. */
@@ -456,16 +459,17 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
          * It's important that we do this first!  Otherwise we might exit the
          * function without doing so, and try to replay a failed create
          * (bug 3440) */
-        if (it->it_op & IT_OPEN && req->rq_replay &&
-            (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
+        if ((it->it_op & IT_OPEN) &&
+            req->rq_replay &&
+            (!it_disposition(it, DISP_OPEN_OPEN) ||
+             it->d.lustre.it_status != 0))
                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
 
         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
 
         /* We know what to expect, so we do any byte flipping required here */
-        LASSERT(repbufcnt == 5 || repbufcnt == 2);
-        if (repbufcnt == 5) {
+        if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
                 struct mds_body *body;
 
                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
@@ -484,6 +488,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                         mdc_set_open_replay_data(NULL, req);
 
                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
+                        void *eadata;
+
                         /* The eadata is opaque; just check that it is there.
                          * Eventually, obd_unpackmd() will check the contents */
                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
@@ -493,15 +499,17 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                                 RETURN (-EPROTO);
                         }
                         if (body->valid & OBD_MD_FLMODEASIZE) {
+                                struct obd_device *obddev = class_exp2obd(exp);
+
                                 if (obddev->u.cli.cl_max_mds_easize < 
-                                                        body->max_mdsize) {
+                                    body->max_mdsize) {
                                         obddev->u.cli.cl_max_mds_easize = 
                                                 body->max_mdsize;
                                         CDEBUG(D_INFO, "maxeasize become %d\n",
                                                body->max_mdsize);
                                 }
                                 if (obddev->u.cli.cl_max_mds_cookiesize <
-                                                        body->max_cookiesize) {
+                                    body->max_cookiesize) {
                                         obddev->u.cli.cl_max_mds_cookiesize =
                                                 body->max_cookiesize;
                                         CDEBUG(D_INFO, "cookiesize become %d\n",
@@ -514,10 +522,11 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                          * reallocate it here to hold the actual LOV EA. */
                         if (it->it_op & IT_OPEN) {
                                 int offset = DLM_INTENT_REC_OFF + 2;
+                                void *lmm;
 
                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
                                     body->eadatasize)
-                                        mdc_realloc_openmsg(req, body, size);
+                                        mdc_realloc_openmsg(req, body);
 
                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
                                                      body->eadatasize);
@@ -529,8 +538,203 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 
         RETURN(rc);
 }
+
+/* We always reserve enough space in the reply packet for a stripe MD, because
+ * we don't know in advance the file type. */
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                struct lookup_intent *it, struct mdc_op_data *data,
+                struct lustre_handle *lockh, void *lmm, int lmmsize,
+                int extra_lock_flags)
+{
+        struct ptlrpc_request *req;
+        struct obd_device *obddev = class_exp2obd(exp);
+        struct ldlm_res_id res_id =
+                { .name = {data->fid1.id, data->fid1.generation} };
+        ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
+        int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
+        int rc;
+        ENTRY;
+
+        LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
+        if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
+                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+
+        if (it->it_op & IT_OPEN) {
+                req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
+                if (it->it_flags & O_JOIN_FILE) {
+                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                }
+        } else if (it->it_op & IT_UNLINK) {
+                req = mdc_intent_unlink_pack(exp, it, data);
+        } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
+                req = mdc_intent_lookup_pack(exp, it, data);
+        } else if (it->it_op == IT_READDIR) {
+                req = mdc_intent_readdir_pack(exp);
+        } else {
+                CERROR("bad it_op %x\n", it->it_op);
+                RETURN(-EINVAL);
+        }
+
+        if (!req)
+                RETURN(-ENOMEM);
+
+         /* It is important to obtain rpc_lock first (if applicable), so that
+          * threads that are serialised with rpc_lock are not polluting our
+          * rpcs in flight counter */
+        mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+        mdc_enter_request(&obddev->u.cli);
+        rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
+                              0, NULL, lockh, 0);
+        mdc_exit_request(&obddev->u.cli);
+        mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+
+        rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
+
+        RETURN(rc);
+}
 EXPORT_SYMBOL(mdc_enqueue);
 
+int mdc_revalidate_lock(struct obd_export *exp,
+                        struct lookup_intent *it,
+                        struct ll_fid *fid)
+{
+        /* We could just return 1 immediately, but since we should only
+         * be called in revalidate_it if we already have a lock, let's
+         * verify that. */
+        struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}};
+        struct lustre_handle lockh;
+        ldlm_policy_data_t policy;
+        int mode = LCK_CR;
+        int rc;
+
+        /* As not all attributes are kept under update lock, e.g. 
+           owner/group/acls are under lookup lock, we need both 
+           ibits for GETATTR. */
+        policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
+                MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
+                MDS_INODELOCK_LOOKUP;
+
+        rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED,
+                             &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
+        if (!rc) {
+                mode = LCK_CW;
+                rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
+                                     LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
+                                     &policy, LCK_CW, &lockh);
+        }
+        if (!rc) {
+                mode = LCK_PR;
+                rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
+                                     LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
+                                     &policy, LCK_PR, &lockh);
+        }
+        if (rc) {
+                memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
+                it->d.lustre.it_lock_mode = mode;
+        }
+
+        return rc;
+}
+EXPORT_SYMBOL(mdc_revalidate_lock);
+
+static int mdc_finish_intent_lock(struct obd_export *exp,
+                                  struct ptlrpc_request *req,
+                                  struct mdc_op_data *data,
+                                  struct lookup_intent *it,
+                                  struct lustre_handle *lockh)
+{
+        struct mds_body *mds_body;
+        struct lustre_handle old_lock;
+        struct ldlm_lock *lock;
+        int rc;
+        ENTRY;
+
+        LASSERT(req != NULL);
+        LASSERT(req != LP_POISON);
+        LASSERT(req->rq_repmsg != LP_POISON);
+
+        if (!it_disposition(it, DISP_IT_EXECD)) {
+                /* The server failed before it even started executing the
+                 * intent, i.e. because it couldn't unpack the request. */
+                LASSERT(it->d.lustre.it_status != 0);
+                RETURN(it->d.lustre.it_status);
+        }
+        rc = it_open_error(DISP_IT_EXECD, it);
+        if (rc)
+                RETURN(rc);
+
+        mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
+                                  sizeof(*mds_body));
+        LASSERT(mds_body != NULL);           /* mdc_enqueue checked */
+        LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* mdc_enqueue swabbed */
+
+        /* If we were revalidating a fid/name pair, mark the intent in
+         * case we fail and get called again from lookup */
+        if (data->fid2.id && (it->it_op != IT_GETATTR)) {
+                it_set_disposition(it, DISP_ENQ_COMPLETE);
+                /* Also: did we find the same inode? */
+                if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2))) 
+                        RETURN(-ESTALE);
+        }
+
+        rc = it_open_error(DISP_LOOKUP_EXECD, it);
+        if (rc)
+                RETURN(rc);
+
+        /* keep requests around for the multiple phases of the call
+         * this shows the DISP_XX must guarantee we make it into the call
+         */
+        if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
+            it_disposition(it, DISP_OPEN_CREATE) &&
+            !it_open_error(DISP_OPEN_CREATE, it)) {
+                it_set_disposition(it, DISP_ENQ_CREATE_REF);
+                ptlrpc_request_addref(req); /* balanced in ll_create_node */
+        }
+        if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
+            it_disposition(it, DISP_OPEN_OPEN) &&
+            !it_open_error(DISP_OPEN_OPEN, it)) {
+                it_set_disposition(it, DISP_ENQ_OPEN_REF);
+                ptlrpc_request_addref(req); /* balanced in ll_file_open */
+                /* BUG 11546 - eviction in the middle of open rpc processing */
+                OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
+        }
+
+        if (it->it_op & IT_CREAT) {
+                /* XXX this belongs in ll_create_it */
+        } else if (it->it_op == IT_OPEN) {
+                LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
+        } else {
+                LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
+        }
+
+        /* If we already have a matching lock, then cancel the new
+         * one.  We have to set the data here instead of in
+         * mdc_enqueue, because we need to use the child's inode as
+         * the l_ast_data to match, and that's not available until
+         * intent_finish has performed the iget().) */
+        lock = ldlm_handle2lock(lockh);
+        if (lock) {
+                ldlm_policy_data_t policy = lock->l_policy_data;
+
+                LDLM_DEBUG(lock, "matching against this");
+                LDLM_LOCK_PUT(lock);
+                memcpy(&old_lock, lockh, sizeof(*lockh));
+                if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
+                                    LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
+                        ldlm_lock_decref_and_cancel(lockh,
+                                                    it->d.lustre.it_lock_mode);
+                        memcpy(lockh, &old_lock, sizeof(old_lock));
+                        memcpy(&it->d.lustre.it_lock_handle, lockh,
+                               sizeof(*lockh));
+                }
+        }
+
+        CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
+               data->namelen, data->name, ldlm_it2str(it->it_op),
+               it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
+        RETURN(rc);
+}
+
 /* 
  * This long block is all about fixing up the lock and request state
  * so that it is correct as of the moment _before_ the operation was
@@ -564,12 +768,9 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
                     ldlm_blocking_callback cb_blocking, int extra_lock_flags)
 {
         struct lustre_handle lockh;
-        struct ptlrpc_request *request;
-        int rc = 0;
-        struct mds_body *mds_body;
-        struct lustre_handle old_lock;
-        struct ldlm_lock *lock;
+        int rc;
         ENTRY;
+
         LASSERT(it);
 
         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
@@ -578,43 +779,7 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
 
         if (op_data->fid2.id &&
             (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
-                /* We could just return 1 immediately, but since we should only
-                 * be called in revalidate_it if we already have a lock, let's
-                 * verify that. */
-                struct ldlm_res_id res_id = {.name ={op_data->fid2.id,
-                                                     op_data->fid2.generation}};
-                struct lustre_handle lockh;
-                ldlm_policy_data_t policy;
-                int mode = LCK_CR;
-
-                /* As not all attributes are kept under update lock, e.g. 
-                   owner/group/acls are under lookup lock, we need both 
-                   ibits for GETATTR. */
-                policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
-                        MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
-                        MDS_INODELOCK_LOOKUP;
-
-                rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                     LDLM_FL_BLOCK_GRANTED, &res_id,
-                                     LDLM_IBITS, &policy, LCK_CR, &lockh);
-                if (!rc) {
-                        mode = LCK_CW;
-                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                             LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_IBITS, &policy,LCK_CW,&lockh);
-                }
-                if (!rc) {
-                        mode = LCK_PR;
-                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                             LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_IBITS, &policy,LCK_PR,&lockh);
-                }
-                if (rc) {
-                        memcpy(&it->d.lustre.it_lock_handle, &lockh,
-                               sizeof(lockh));
-                        it->d.lustre.it_lock_mode = mode;
-                }
-
+                rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
                 /* Only return failure if it was not GETATTR by cfid
                    (from inode_revalidate) */
                 if (rc || op_data->namelen != 0)
@@ -646,90 +811,100 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
                  * lookup, so we clear DISP_ENQ_COMPLETE */
                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
         }
-        request = *reqp = it->d.lustre.it_data;
-        LASSERT(request != NULL);
-        LASSERT(request != LP_POISON);
-        LASSERT(request->rq_repmsg != LP_POISON);
 
-        if (!it_disposition(it, DISP_IT_EXECD)) {
-                /* The server failed before it even started executing the
-                 * intent, i.e. because it couldn't unpack the request. */
-                LASSERT(it->d.lustre.it_status != 0);
-                RETURN(it->d.lustre.it_status);
-        }
-        rc = it_open_error(DISP_IT_EXECD, it);
-        if (rc)
-                RETURN(rc);
+        *reqp = it->d.lustre.it_data;
+        rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
 
-        mds_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF,
-                                  sizeof(*mds_body));
-        LASSERT(mds_body != NULL);           /* mdc_enqueue checked */
-        LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
+        RETURN(rc);
+}
+EXPORT_SYMBOL(mdc_intent_lock);
 
-        /* If we were revalidating a fid/name pair, mark the intent in
-         * case we fail and get called again from lookup */
-        if (op_data->fid2.id && (it->it_op != IT_GETATTR)) {
-                it_set_disposition(it, DISP_ENQ_COMPLETE);
-                /* Also: did we find the same inode? */
-                if (memcmp(&op_data->fid2, &mds_body->fid1,
-                           sizeof(op_data->fid2)))
-                        RETURN (-ESTALE);
-        }
+static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
+                                              void *unused, int rc)
+{
+        struct mdc_enqueue_args  *ma;
+        struct md_enqueue_info   *minfo;
+        struct ldlm_enqueue_info *einfo;
+        struct obd_export        *exp;
+        struct lookup_intent     *it;
+        struct lustre_handle     *lockh;
+        struct obd_device        *obddev;
+        int                       flags = LDLM_FL_HAS_INTENT;
+        ENTRY;
 
-        rc = it_open_error(DISP_LOOKUP_EXECD, it);
+        ma = (struct mdc_enqueue_args *)&req->rq_async_args;
+        minfo = ma->ma_mi;
+        einfo = ma->ma_ei;
+
+        exp   = minfo->mi_exp;
+        it    = &minfo->mi_it;
+        lockh = &minfo->mi_lockh;
+
+        obddev = class_exp2obd(exp);
+
+        mdc_exit_request(&obddev->u.cli);
+
+        rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
+                                   &flags, NULL, 0, NULL, lockh, rc);
+
+        rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
         if (rc)
-                RETURN(rc);
+                GOTO(out, rc);
 
-        /* keep requests around for the multiple phases of the call
-         * this shows the DISP_XX must guarantee we make it into the call
-         */
-        if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
-            it_disposition(it, DISP_OPEN_CREATE) &&
-            !it_open_error(DISP_OPEN_CREATE, it)) {
-                it_set_disposition(it, DISP_ENQ_CREATE_REF);
-                ptlrpc_request_addref(request); /* balanced in ll_create_node */
-        }
-        if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
-            it_disposition(it, DISP_OPEN_OPEN) &&
-            !it_open_error(DISP_OPEN_OPEN, it)) {
-                it_set_disposition(it, DISP_ENQ_OPEN_REF);
-                ptlrpc_request_addref(request); /* balanced in ll_file_open */
-                /* BUG 11546 - eviction in the middle of open rpc processing */
-                OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
-        }
+        memcpy(&it->d.lustre.it_lock_handle, lockh, sizeof(*lockh));
 
-        if (it->it_op & IT_CREAT) {
-                /* XXX this belongs in ll_create_it */
-        } else if (it->it_op == IT_OPEN) {
-                LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
-        } else {
-                LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
-        }
+        rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
+        GOTO(out, rc);
+out:
+        OBD_FREE_PTR(einfo);
+        minfo->mi_cb(exp, req, minfo, rc);
 
-        /* If we already have a matching lock, then cancel the new
-         * one.  We have to set the data here instead of in
-         * mdc_enqueue, because we need to use the child's inode as
-         * the l_ast_data to match, and that's not available until
-         * intent_finish has performed the iget().) */
-        lock = ldlm_handle2lock(&lockh);
-        if (lock) {
-                ldlm_policy_data_t policy = lock->l_policy_data;
-                LDLM_DEBUG(lock, "matching against this");
-                LDLM_LOCK_PUT(lock);
-                memcpy(&old_lock, &lockh, sizeof(lockh));
-                if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
-                                    LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
-                        ldlm_lock_decref_and_cancel(&lockh,
-                                                    it->d.lustre.it_lock_mode);
-                        memcpy(&lockh, &old_lock, sizeof(old_lock));
-                        memcpy(&it->d.lustre.it_lock_handle, &lockh,
-                               sizeof(lockh));
-                }
+        return 0;
+}
+
+int mdc_intent_getattr_async(struct obd_export *exp,
+                             struct md_enqueue_info *minfo,
+                             struct ldlm_enqueue_info *einfo)
+{
+        struct mdc_op_data      *op_data = &minfo->mi_data;
+        struct lookup_intent    *it = &minfo->mi_it;
+        struct ptlrpc_request   *req;
+        struct obd_device       *obddev = class_exp2obd(exp);
+        struct ldlm_res_id       res_id = {
+                                        .name = {op_data->fid1.id,
+                                                 op_data->fid1.generation}
+                                 };
+        ldlm_policy_data_t       policy = {
+                                        .l_inodebits = { MDS_INODELOCK_LOOKUP }
+                                 };
+        struct mdc_enqueue_args *aa;
+        int                      rc;
+        int                      flags = LDLM_FL_HAS_INTENT;
+        ENTRY;
+
+        CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
+               op_data->namelen, op_data->name, op_data->fid1.id,
+               ldlm_it2str(it->it_op), it->it_flags);
+
+        req = mdc_intent_lookup_pack(exp, it, op_data);
+        if (!req)
+                RETURN(-ENOMEM);
+
+        mdc_enter_request(&obddev->u.cli);
+        rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
+                              0, NULL, &minfo->mi_lockh, 1);
+        if (rc < 0) {
+                mdc_exit_request(&obddev->u.cli);
+                RETURN(rc);
         }
-        CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
-               op_data->namelen, op_data->name, ldlm_it2str(it->it_op),
-               it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
 
-        RETURN(rc);
+        CLASSERT(sizeof(*aa) < sizeof(req->rq_async_args));
+        aa = (struct mdc_enqueue_args *)&req->rq_async_args;
+        aa->ma_mi = minfo;
+        aa->ma_ei = einfo;
+        req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
+        ptlrpcd_add_req(req);
+
+        RETURN(0);
 }
-EXPORT_SYMBOL(mdc_intent_lock);
+EXPORT_SYMBOL(mdc_intent_getattr_async);
diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c
index 0846aca06ec83c3e97cc966d345856bdf72772c6..f73c236af4f2539fff9871d37ca06eadb3723a87 100644
--- a/lustre/mdc/mdc_request.c
+++ b/lustre/mdc/mdc_request.c
@@ -108,6 +108,7 @@ static
 int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size, 
                        unsigned int acl_size, struct ptlrpc_request *req)
 {
+        struct obd_device *obddev = class_exp2obd(exp);
         struct mds_body *body;
         void *eadata;
         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
@@ -128,7 +129,9 @@ int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size,
 
         ptlrpc_req_set_repsize(req, bufcount, size);
 
+        mdc_enter_request(&obddev->u.cli);
         rc = ptlrpc_queue_wait(req);
+        mdc_exit_request(&obddev->u.cli);
         if (rc != 0)
                 RETURN (rc);
 
@@ -236,6 +239,7 @@ int mdc_xattr_common(struct obd_export *exp, struct ll_fid *fid,
                      const char *input, int input_size, int output_size,
                      int flags, struct ptlrpc_request **request)
 {
+        struct obd_device *obddev = class_exp2obd(exp);
         struct ptlrpc_request *req;
         int size[4] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body) };
         // int size[3] = {sizeof(struct mds_body)}, bufcnt = 1;
@@ -287,11 +291,15 @@ int mdc_xattr_common(struct obd_export *exp, struct ll_fid *fid,
         /* make rpc */
         if (opcode == MDS_SETXATTR)
                 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+        else
+                mdc_enter_request(&obddev->u.cli);
 
         rc = ptlrpc_queue_wait(req);
 
         if (opcode == MDS_SETXATTR)
                 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+        else
+                mdc_exit_request(&obddev->u.cli);
 
         if (rc != 0)
                 GOTO(err_out, rc);
diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh
index 32f627665f32f0ecced1cc08c2c37891f53a0b05..0474406c50c7afc0a8e376af2123c2ca120fdb69 100644
--- a/lustre/tests/sanity.sh
+++ b/lustre/tests/sanity.sh
@@ -84,6 +84,7 @@ init_test_env $@
 
 cleanup() {
 	echo -n "cln.."
+	pgrep ll_sa > /dev/null && { echo "There are ll_sa thread not exit!"; exit 20; }
 	cleanupall ${FORCE} $* || { echo "FAILed to clean up"; exit 20; }
 }
 CLEANUP=${CLEANUP:-:}
@@ -4386,18 +4387,75 @@ test_122() { #bug #11544
 }
 run_test 122 "fail client bulk callback (shouldn't LBUG) ======="
 
+test_123() # statahead(bug 11401)
+{
+        if [ -z "$(grep "processor.*: 1" /proc/cpuinfo)" ]; then
+                log "single core CPU, skipping test"  # && return
+        fi
+
+        mkdir -p $DIR/$tdir
+
+        for ((i=1, j=0; i<=10000; j=$i, i=$((i * 10)) )); do
+                createmany -o $DIR/$tdir/$tfile $j $((i - j))
+
+                grep '[0-9]' $LPROC/llite/*/statahead_max
+                cancel_lru_locks mdc
+                stime=`date +%s`
+                ls -l $DIR/$tdir > /dev/null
+                etime=`date +%s`
+                delta_sa=$((etime - stime))
+                echo "ls $i files with statahead:    $delta_sa sec"
+
+                for client in $LPROC/llite/*; do
+                        max=`cat $client/statahead_max`
+                        cat $client/statahead_stats
+                        echo 0 > $client/statahead_max
+                done
+
+                grep '[0-9]' $LPROC/llite/*/statahead_max
+                cancel_lru_locks mdc
+                stime=`date +%s`
+                ls -l $DIR/$tdir > /dev/null
+                etime=`date +%s`
+                delta=$((etime - stime))
+                echo "ls $i files without statahead: $delta sec"
+
+                for client in /proc/fs/lustre/llite/*; do
+                        cat $client/statahead_stats
+                        echo $max > $client/statahead_max
+                done
+
+                if [ $delta_sa -gt $delta ]; then
+                        error "ls $i files is slower with statahead!"
+                fi
+        done
+        echo "ls done"
+
+        stime=`date +%s`
+        rm -r $DIR/$tdir
+        sync
+        etime=`date +%s`
+        delta=$((etime - stime))
+        echo "rm -r $DIR/$tdir/: $delta seconds"
+        echo "rm done"
+        cat /proc/fs/lustre/llite/*/statahead_stats
+        # wait for commitment of removal
+        sleep 2
+}
+run_test 123 "verify statahead work"
+
 TMPDIR=$OLDTMPDIR
 TMP=$OLDTMP
 HOME=$OLDHOME
 
 log "cleanup: ======================================================"
 if [ "`mount | grep $MOUNT`" ]; then
-	rm -rf $DIR/[Rdfs][1-9]*
+    rm -rf $DIR/[Rdfs][1-9]*
 fi
 if [ "$I_MOUNTED" = "yes" ]; then
-	cleanupall -f || error "cleanup failed"
+    cleanupall -f || error "cleanup failed"
 else
-	sysctl -w lnet.debug="$OLDDEBUG" 2> /dev/null || true
+        sysctl -w lnet.debug="$OLDDEBUG" 2> /dev/null || true
 fi