From b63da8a5e33b4b0f88e2548a8f7aca7efef6b3e8 Mon Sep 17 00:00:00 2001
From: huanghua <huanghua>
Date: Mon, 3 Dec 2007 04:17:33 +0000
Subject: [PATCH] Branch HEAD b=14064

- readdir() for liblustre: i=nikita,fanyong
- flock for liblustre and various fixes for liblustre: i=umka,fanyong
---
 lustre/fid/fid_lib.c               |  26 ----
 lustre/include/lustre/lustre_idl.h |  26 +++-
 lustre/include/lustre_fid.h        |   3 -
 lustre/include/lustre_lite.h       |  19 +++
 lustre/liblustre/dir.c             | 183 ++++++++++++++++++-----------
 lustre/liblustre/file.c            |   3 +-
 lustre/liblustre/namei.c           |   4 +-
 lustre/liblustre/rw.c              |   7 ++
 lustre/liblustre/super.c           |  48 +++++---
 lustre/llite/dir.c                 |  19 ---
 lustre/lmv/lmv_internal.h          |   4 +
 lustre/mdt/mdt_handler.c           |   4 +-
 lustre/mdt/mdt_lib.c               |   1 -
 lustre/mdt/mdt_open.c              |  15 ++-
 lustre/obdfilter/filter.c          |   6 +-
 15 files changed, 221 insertions(+), 147 deletions(-)

diff --git a/lustre/fid/fid_lib.c b/lustre/fid/fid_lib.c
index 507439302b..71cd00e503 100644
--- a/lustre/fid/fid_lib.c
+++ b/lustre/fid/fid_lib.c
@@ -67,32 +67,6 @@ const struct lu_fid LUSTRE_BFL_FID = { .f_seq = 0x0000000000000003,
                                        .f_ver = 0x0000000000000000 };
 EXPORT_SYMBOL(LUSTRE_BFL_FID);
 
-void fid_cpu_to_le(struct lu_fid *dst, const struct lu_fid *src)
-{
-        /* check that all fields are converted */
-        CLASSERT(sizeof *src ==
-                 sizeof fid_seq(src) +
-                 sizeof fid_oid(src) + sizeof fid_ver(src));
-        LASSERTF(fid_is_igif(src) || fid_ver(src) == 0, DFID"\n", PFID(src));
-        dst->f_seq = cpu_to_le64(fid_seq(src));
-        dst->f_oid = cpu_to_le32(fid_oid(src));
-        dst->f_ver = cpu_to_le32(fid_ver(src));
-}
-EXPORT_SYMBOL(fid_cpu_to_le);
-
-void fid_le_to_cpu(struct lu_fid *dst, const struct lu_fid *src)
-{
-        /* check that all fields are converted */
-        CLASSERT(sizeof *src ==
-                 sizeof fid_seq(src) +
-                 sizeof fid_oid(src) + sizeof fid_ver(src));
-        dst->f_seq = le64_to_cpu(fid_seq(src));
-        dst->f_oid = le32_to_cpu(fid_oid(src));
-        dst->f_ver = le32_to_cpu(fid_ver(src));
-        LASSERTF(fid_is_igif(dst) || fid_ver(dst) == 0, DFID"\n", PFID(dst));
-}
-EXPORT_SYMBOL(fid_le_to_cpu);
-
 void range_cpu_to_le(struct lu_range *dst, const struct lu_range *src)
 {
         /* check that all fields are converted */
diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h
index 2a39b4820e..2871e4295f 100644
--- a/lustre/include/lustre/lustre_idl.h
+++ b/lustre/include/lustre/lustre_idl.h
@@ -251,7 +251,30 @@ static inline int fid_is_igif(const struct lu_fid *fid)
         fid_oid(fid), \
         fid_ver(fid)
 
-#ifdef __KERNEL__
+static inline void fid_cpu_to_le(struct lu_fid *dst, const struct lu_fid *src)
+{
+        /* check that all fields are converted */
+        CLASSERT(sizeof *src ==
+                 sizeof fid_seq(src) +
+                 sizeof fid_oid(src) + sizeof fid_ver(src));
+        LASSERTF(fid_is_igif(src) || fid_ver(src) == 0, DFID"\n", PFID(src));
+        dst->f_seq = cpu_to_le64(fid_seq(src));
+        dst->f_oid = cpu_to_le32(fid_oid(src));
+        dst->f_ver = cpu_to_le32(fid_ver(src));
+}
+
+static inline void fid_le_to_cpu(struct lu_fid *dst, const struct lu_fid *src)
+{
+        /* check that all fields are converted */
+        CLASSERT(sizeof *src ==
+                 sizeof fid_seq(src) +
+                 sizeof fid_oid(src) + sizeof fid_ver(src));
+        dst->f_seq = le64_to_cpu(fid_seq(src));
+        dst->f_oid = le32_to_cpu(fid_oid(src));
+        dst->f_ver = le32_to_cpu(fid_ver(src));
+        LASSERTF(fid_is_igif(dst) || fid_ver(dst) == 0, DFID"\n", PFID(dst));
+}
+
 static inline void fid_cpu_to_be(struct lu_fid *dst, const struct lu_fid *src)
 {
         /* check that all fields are converted */
@@ -276,6 +299,7 @@ static inline void fid_be_to_cpu(struct lu_fid *dst, const struct lu_fid *src)
         LASSERTF(fid_is_igif(dst) || fid_ver(dst) == 0, DFID"\n", PFID(dst));
 }
 
+#ifdef __KERNEL__
 /*
  * Storage representation for fids.
  *
diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h
index a4dec46756..d47304d2ac 100644
--- a/lustre/include/lustre_fid.h
+++ b/lustre/include/lustre_fid.h
@@ -195,9 +195,6 @@ int seq_client_alloc_fid(struct lu_client_seq *seq,
 /* Fids common stuff */
 int fid_is_local(struct lu_site *site, const struct lu_fid *fid);
 
-void fid_cpu_to_le(struct lu_fid *dst, const struct lu_fid *src);
-void fid_le_to_cpu(struct lu_fid *dst, const struct lu_fid *src);
-
 /* fid locking */
 
 struct ldlm_namespace;
diff --git a/lustre/include/lustre_lite.h b/lustre/include/lustre_lite.h
index 8821a75afc..f2c37bf893 100644
--- a/lustre/include/lustre_lite.h
+++ b/lustre/include/lustre_lite.h
@@ -137,4 +137,23 @@ static inline int ll_ocd_update(struct obd_device *host,
         RETURN(result);
 }
 
+/*      
+ * Chain of hash overflow pages.
+ */            
+struct ll_dir_chain {
+        /* XXX something. Later */
+};
+        
+static inline void ll_dir_chain_init(struct ll_dir_chain *chain)
+{       
+}
+
+static inline void ll_dir_chain_fini(struct ll_dir_chain *chain)
+{
+}
+
+static inline __u32 hash_x_index(__u32 value)
+{
+        return ((__u32)~0) - value;
+}
 #endif
diff --git a/lustre/liblustre/dir.c b/lustre/liblustre/dir.c
index 88c0f3edf0..fdeabf373c 100644
--- a/lustre/liblustre/dir.c
+++ b/lustre/liblustre/dir.c
@@ -62,19 +62,21 @@
 
 #include "llite_lib.h"
 
+/* (new) readdir implementation overview can be found in lustre/llite/dir.c */
+
 static int llu_dir_do_readpage(struct inode *inode, struct page *page)
 {
         struct llu_inode_info *lli = llu_i2info(inode);
-        struct intnl_stat *st = llu_i2stat(inode);
-        struct llu_sb_info *sbi = llu_i2sbi(inode);
-        __u64 offset;
-        int rc = 0;
+        struct intnl_stat     *st = llu_i2stat(inode);
+        struct llu_sb_info    *sbi = llu_i2sbi(inode);
         struct ptlrpc_request *request;
-        struct lustre_handle lockh;
-        struct mdt_body *body;
-        struct lookup_intent it = { .it_op = IT_READDIR };
-        struct md_op_data op_data;
+        struct lustre_handle   lockh;
+        struct mdt_body       *body;
+        struct lookup_intent   it = { .it_op = IT_READDIR };
+        struct md_op_data      op_data = {{ 0 }};
         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } };
+        __u64 offset;
+        int rc = 0;
         ENTRY;
 
         rc = md_lock_match(sbi->ll_md_exp, LDLM_FL_BLOCK_GRANTED,
@@ -99,7 +101,7 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page)
         }
         ldlm_lock_dump_handle(D_OTHER, &lockh);
 
-        offset = (__u64)page->index << CFS_PAGE_SHIFT;
+        offset = (__u64)hash_x_index(page->index);
         rc = md_readpage(sbi->ll_md_exp, &lli->lli_fid, NULL,
                          offset, page, &request);
         if (!rc) {
@@ -109,7 +111,8 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page)
                 /* swabbed by md_readpage() */
                 LASSERT(lustre_rep_swabbed(request, REPLY_REC_OFF));
 
-                st->st_size = body->size;
+                if (body->valid & OBD_MD_FLSIZE)
+                        st->st_size = body->size;
         } else {
                 CERROR("read_dir_page(%ld) error %d\n", page->index, rc);
         }
@@ -120,7 +123,8 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page)
         return rc;
 }
 
-static struct page *llu_dir_read_page(struct inode *ino, unsigned long pgidx)
+static struct page *llu_dir_read_page(struct inode *ino, __u32 hash,
+                                      int exact, struct ll_dir_chain *chain)
 {
         struct page *page;
         int rc;
@@ -129,7 +133,7 @@ static struct page *llu_dir_read_page(struct inode *ino, unsigned long pgidx)
         OBD_PAGE_ALLOC(page, 0);
         if (!page)
                 RETURN(ERR_PTR(-ENOMEM));
-        page->index = pgidx;
+        page->index = hash_x_index(hash);
 
         rc = llu_dir_do_readpage(ino, page);
         if (rc) {
@@ -165,7 +169,6 @@ static unsigned char ext2_filetype_table[EXT2_FT_MAX] = {
 
 #define NAME_OFFSET(de) ((int) ((de)->d_name - (char *) (de)))
 #define ROUND_UP64(x)   (((x)+sizeof(__u64)-1) & ~(sizeof(__u64)-1))
-
 static int filldir(char *buf, int buflen,
                    const char *name, int namelen, loff_t offset,
                    ino_t ino, unsigned int d_type, int *filled)
@@ -191,14 +194,23 @@ static int filldir(char *buf, int buflen,
         return 0;
 }
 
-ssize_t llu_iop_filldirentries(struct inode *ino, _SYSIO_OFF_T *basep, 
+/* 
+ * TODO: much of the code here is similar/identical to llite ll_readdir().
+ * These code can be factored out and shared in a common module.
+ */
+
+ssize_t llu_iop_filldirentries(struct inode *dir, _SYSIO_OFF_T *basep, 
 			       char *buf, size_t nbytes)
 {
-        struct llu_inode_info *lli = llu_i2info(ino);
-        struct intnl_stat *st = llu_i2stat(ino);
-        loff_t pos = *basep, offset;
-        unsigned long maxpages, pgidx;
+        struct llu_inode_info *lli = llu_i2info(dir);
+        struct intnl_stat     *st = llu_i2stat(dir);
+        loff_t                 pos = *basep;
+        struct ll_dir_chain    chain;
+        struct page           *page;
         int filled = 0;
+        int rc;
+        int done;
+        int shift;
         ENTRY;
 
         liblustre_wait_event(0);
@@ -208,62 +220,99 @@ ssize_t llu_iop_filldirentries(struct inode *ino, _SYSIO_OFF_T *basep,
                 RETURN(0);
         }
 
-        if (pos == -1)
-                pos = lli->lli_dir_pos;
-
-        maxpages = (st->st_size + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
-        pgidx = pos >> CFS_PAGE_SHIFT;
-        offset = pos & ~CFS_PAGE_MASK;
-
-        for ( ; pgidx < maxpages ; pgidx++, offset = 0) {
-                struct page *page;
-                struct ext2_dirent *de;
-                char *addr, *limit;
-
-                page = llu_dir_read_page(ino, pgidx);
-                if (IS_ERR(page))
-                        continue;
-
-                /* size might have been updated by md_readpage */
-                maxpages = (st->st_size + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
-
-                /* fill in buffer */
-                addr = page->addr;
-                limit = addr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
-                de = (struct ext2_dirent *) (addr + offset);
-
-                for ( ; (char*) de <= limit; de = ext2_next_entry(de)) {
-                        if (de->inode) {
-                                int over;
-                                unsigned char d_type = DT_UNKNOWN;
-
-                                if (de->file_type < EXT2_FT_MAX)
-                                        d_type = ext2_filetype_table[de->file_type];
-
-                                offset = (char*) de - addr;
-                                over =  filldir(buf, nbytes, de->name, de->name_len,
-                                                (((__u64)pgidx << PAGE_SHIFT) | offset) +
-                                                le16_to_cpu(de->rec_len),
-                                                le32_to_cpu(de->inode), d_type, &filled);
-                                if (over) {
-                                        OBD_PAGE_FREE(page);
+        if (pos == DIR_END_OFF)
+                /*
+                 * end-of-file.
+                 */
+                RETURN(0);
+
+        rc    = 0;
+        done  = 0;
+        shift = 0;
+        ll_dir_chain_init(&chain);
+
+        page = llu_dir_read_page(dir, pos, 0, &chain);
+        while (rc == 0 && !done) {
+                struct lu_dirpage *dp;
+                struct lu_dirent  *ent;
+
+                if (!IS_ERR(page)) {
+                        /* 
+                         * If page is empty (end of directoryis reached),
+                         * use this value. 
+                         */
+                        __u32 hash = DIR_END_OFF;
+                        __u32 next;
+
+                        dp = page->addr;
+                        for (ent = lu_dirent_start(dp); ent != NULL && !done;
+                             ent = lu_dirent_next(ent)) {
+                                char          *name;
+                                int            namelen;
+                                struct lu_fid  fid;
+                                ino_t          ino;
+
+                                hash    = le32_to_cpu(ent->lde_hash);
+                                namelen = le16_to_cpu(ent->lde_namelen);
+
+                                if (hash < pos)
+                                        /*
+                                         * Skip until we find target hash
+                                         * value.
+                                         */
+                                        continue;
+
+                                if (namelen == 0)
+                                        /*
+                                         * Skip dummy record.
+                                         */
+                                        continue;
+
+                                fid  = ent->lde_fid;
+                                name = ent->lde_name;
+                                fid_le_to_cpu(&fid, &fid);
+                                ino  = llu_fid_build_ino(llu_i2sbi(dir), &fid);
+
+                                done = filldir(buf, nbytes, name, namelen,
+                                               (loff_t)hash, ino, DT_UNKNOWN,
+                                               &filled);
+                        }
+                        next = le32_to_cpu(dp->ldp_hash_end);
+                        OBD_PAGE_FREE(page);
+                        if (!done) {
+                                pos = next;
+                                if (pos == DIR_END_OFF)
+                                        /*
+                                         * End of directory reached.
+                                         */
+                                        done = 1;
+                                else if (1 /* chain is exhausted*/)
                                         /*
-                                         * if buffer overflow with no data
-                                         * returned yet, then report error
-                                         * instead of eof
+                                         * Normal case: continue to the next
+                                         * page.
+                                         */
+                                        page = llu_dir_read_page(dir, pos, 1,
+                                                               &chain);
+                                else {
+                                        /*
+                                         * go into overflow page.
                                          */
-                                        if (filled == 0)
-                                                RETURN(-EINVAL);
-                                        GOTO(done, 0);
                                 }
+                        } else {
+                                pos = hash;
+                                if (filled == 0)
+                                        GOTO(out, filled = -EINVAL);
                         }
+                } else {
+                        rc = PTR_ERR(page);
+                        CERROR("error reading dir "DFID" at %lu: rc %d\n",
+                               PFID(&lli->lli_fid), (unsigned long)pos, rc);
                 }
-                
-                OBD_PAGE_FREE(page);
         }
-done:
-        lli->lli_dir_pos = pgidx << CFS_PAGE_SHIFT | offset;
+        lli->lli_dir_pos = (loff_t)(__s32)pos;
         *basep = lli->lli_dir_pos;
+out:
+        ll_dir_chain_fini(&chain);
         liblustre_wait_event(0);
         RETURN(filled);
 }
diff --git a/lustre/liblustre/file.c b/lustre/liblustre/file.c
index 891bdf7d90..cf5267bac9 100644
--- a/lustre/liblustre/file.c
+++ b/lustre/liblustre/file.c
@@ -77,7 +77,6 @@ void llu_prep_md_op_data(struct md_op_data *op_data, struct inode *i1,
 {
         LASSERT(i1 != NULL || i2 != NULL);
         LASSERT(op_data);
-        memset(op_data, 0, sizeof(*op_data));
 
         if (i1) {
                 ll_i2gids(op_data->op_suppgids, i1, i2);
@@ -324,7 +323,7 @@ int llu_sizeonmds_update(struct inode *inode, struct md_open_data *mod,
 {
         struct llu_inode_info *lli = llu_i2info(inode);
         struct llu_sb_info *sbi = llu_i2sbi(inode);
-        struct md_op_data op_data;
+        struct md_op_data op_data = {{ 0 }};
         struct obdo oa;
         int rc;
         ENTRY;
diff --git a/lustre/liblustre/namei.c b/lustre/liblustre/namei.c
index 1fcb482e25..ad90650bac 100644
--- a/lustre/liblustre/namei.c
+++ b/lustre/liblustre/namei.c
@@ -206,7 +206,7 @@ static int llu_pb_revalidate(struct pnode *pnode, int flags,
 {
         struct pnode_base *pb = pnode->p_base;
         struct it_cb_data icbd;
-        struct md_op_data op_data;
+        struct md_op_data op_data = {{ 0 }};
         struct ptlrpc_request *req = NULL;
         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
         struct obd_export *exp;
@@ -402,7 +402,7 @@ struct inode *llu_inode_from_lock(struct ldlm_lock *lock)
 static int llu_lookup_it(struct inode *parent, struct pnode *pnode,
                          struct lookup_intent *it, int flags)
 {
-        struct md_op_data op_data;
+        struct md_op_data op_data = {{ 0 }};
         struct it_cb_data icbd;
         struct ptlrpc_request *req = NULL;
         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c
index d7afd798cc..45f5097d0e 100644
--- a/lustre/liblustre/rw.c
+++ b/lustre/liblustre/rw.c
@@ -451,12 +451,18 @@ static int llu_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
         RETURN(0);
 }
 
+static struct obd_capa * llu_ap_lookup_capa(void *data, int cmd)
+{
+        return NULL;
+}
+
 static struct obd_async_page_ops llu_async_page_ops = {
         .ap_make_ready =        NULL,
         .ap_refresh_count =     NULL,
         .ap_fill_obdo =         llu_ap_fill_obdo,
         .ap_update_obdo =       llu_ap_update_obdo,
         .ap_completion =        llu_ap_completion,
+        .ap_lookup_capa =       llu_ap_lookup_capa,
 };
 
 static int llu_queue_pio(int cmd, struct llu_io_group *group,
@@ -528,6 +534,7 @@ static int llu_queue_pio(int cmd, struct llu_io_group *group,
                         llap->llap_cookie = NULL;
                         RETURN(rc);
                 }
+
                 CDEBUG(D_CACHE, "llap %p page %p group %p obj off "LPU64"\n",
                        llap, page, llap->llap_cookie,
                        (obd_off)pages->index << CFS_PAGE_SHIFT);
diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c
index ad759d7273..3dc931bae0 100644
--- a/lustre/liblustre/super.c
+++ b/lustre/liblustre/super.c
@@ -339,7 +339,7 @@ int llu_inode_getattr(struct inode *inode, struct obdo *obdo)
         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
                                OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
-                               OBD_MD_FLCTIME;
+                               OBD_MD_FLCTIME | OBD_MD_FLGROUP;
 
         set = ptlrpc_prep_set();
         if (set == NULL) {
@@ -865,7 +865,8 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr)
                 CDEBUG(D_INODE, "set mtime on OST inode %llu to %lu\n",
                        (long long)st->st_ino, LTIME_S(attr->ia_mtime));
                 oa.o_id = lsm->lsm_object_id;
-                oa.o_valid = OBD_MD_FLID;
+                oa.o_gr = lsm->lsm_object_gr;
+                oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
 
                 obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME);
@@ -946,7 +947,7 @@ static int llu_iop_symlink_raw(struct pnode *pno, const char *tgt)
         int len = qstr->len;
         struct ptlrpc_request *request = NULL;
         struct llu_sb_info *sbi = llu_i2sbi(dir);
-        struct md_op_data op_data;
+        struct md_op_data op_data = {{ 0 }};
         int err = -EMLINK;
         ENTRY;
 
@@ -1061,7 +1062,7 @@ static int llu_iop_mknod_raw(struct pnode *pno,
         struct ptlrpc_request *request = NULL;
         struct inode *dir = pno->p_parent->p_base->pb_ino;
         struct llu_sb_info *sbi = llu_i2sbi(dir);
-        struct md_op_data op_data;
+        struct md_op_data op_data = {{ 0 }};
         int err = -EMLINK;
         ENTRY;
 
@@ -1108,7 +1109,7 @@ static int llu_iop_link_raw(struct pnode *old, struct pnode *new)
         const char *name = new->p_base->pb_name.name;
         int namelen = new->p_base->pb_name.len;
         struct ptlrpc_request *request = NULL;
-        struct md_op_data op_data;
+        struct md_op_data op_data = {{ 0 }};
         int rc;
         ENTRY;
 
@@ -1136,7 +1137,7 @@ static int llu_iop_unlink_raw(struct pnode *pno)
         int len = qstr->len;
         struct inode *target = pno->p_base->pb_ino;
         struct ptlrpc_request *request = NULL;
-        struct md_op_data op_data;
+        struct md_op_data op_data = { { 0 } };
         int rc;
         ENTRY;
 
@@ -1163,7 +1164,7 @@ static int llu_iop_rename_raw(struct pnode *old, struct pnode *new)
         const char *newname = new->p_base->pb_name.name;
         int newnamelen = new->p_base->pb_name.len;
         struct ptlrpc_request *request = NULL;
-        struct md_op_data op_data;
+        struct md_op_data op_data = { { 0 } };
         int rc;
         ENTRY;
 
@@ -1307,7 +1308,7 @@ static int llu_iop_mkdir_raw(struct pnode *pno, mode_t mode)
         int len = qstr->len;
         struct ptlrpc_request *request = NULL;
         struct intnl_stat *st = llu_i2stat(dir);
-        struct md_op_data op_data;
+        struct md_op_data op_data = {{ 0 }};
         int err = -EMLINK;
         ENTRY;
 
@@ -1336,7 +1337,7 @@ static int llu_iop_rmdir_raw(struct pnode *pno)
         const char *name = qstr->name;
         int len = qstr->len;
         struct ptlrpc_request *request = NULL;
-        struct md_op_data op_data;
+        struct md_op_data op_data = {{ 0 }};
         int rc;
         ENTRY;
 
@@ -1434,13 +1435,26 @@ static int llu_file_flock(struct inode *ino,
                 LBUG();
         }
 
-        CDEBUG(D_DLMTRACE, "inode=%llu, pid=%u, flags=%#x, mode=%u, "
-               "start="LPU64", end="LPU64"\n", (unsigned long long)st->st_ino,
-               flock.l_flock.pid, flags, einfo.ei_mode, flock.l_flock.start,
+        CDEBUG(D_DLMTRACE, "inode=%llu, pid=%u, cmd=%d, flags=%#x, mode=%u, "
+               "start="LPX64", end="LPX64"\n", (unsigned long long)st->st_ino,
+               flock.l_flock.pid, cmd, flags, einfo.ei_mode, flock.l_flock.start,
                flock.l_flock.end);
 
-        rc = ldlm_cli_enqueue(llu_i2mdexp(ino), NULL, &einfo, &res_id, 
-                              &flock, &flags, NULL, 0, NULL, &lockh, 0);
+        {
+                struct lmv_obd *lmv;
+                struct obd_device *lmv_obd;
+                lmv_obd = class_exp2obd(llu_i2mdexp(ino));
+                lmv = &lmv_obd->u.lmv;
+
+                if (lmv->desc.ld_tgt_count < 1)
+                        RETURN(rc = -ENODEV);
+                
+                if (lmv->tgts[0].ltd_exp != NULL)
+                        rc = ldlm_cli_enqueue(lmv->tgts[0].ltd_exp, NULL, &einfo, &res_id, 
+                                              &flock, &flags, NULL, 0, NULL, &lockh, 0);
+                else
+                        rc = -ENODEV;
+        }
         RETURN(rc);
 }
 
@@ -1684,7 +1698,7 @@ static int llu_lov_dir_setstripe(struct inode *ino, unsigned long arg)
 {
         struct llu_sb_info *sbi = llu_i2sbi(ino);
         struct ptlrpc_request *request = NULL;
-        struct md_op_data op_data;
+        struct md_op_data op_data = {{ 0 }};
         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
         int rc = 0;
 
@@ -1731,7 +1745,7 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags,
 
         struct ptlrpc_request *req = NULL;
         struct lustre_md md;
-        struct md_op_data data;
+        struct md_op_data data = {{ 0 }};
         struct lustre_handle lockh;
         int rc = 0;
         ENTRY;
@@ -1755,7 +1769,7 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags,
         lli2->lli_symlink_name = NULL;
         ino->i_private = lli2;
 
-        llu_prep_md_op_data(&data, NULL, ino, NULL, 0, O_RDWR, 
+        llu_prep_md_op_data(&data, NULL, ino, NULL, 0, O_RDWR,
                             LUSTRE_OPC_ANY);
 
         rc = md_enqueue(sbi->ll_md_exp, &einfo, &oit, &data,
diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c
index 42c2da66cc..290ae72e3e 100644
--- a/lustre/llite/dir.c
+++ b/lustre/llite/dir.c
@@ -129,10 +129,6 @@
  *
  */
 
-static __u32 hash_x_index(__u32 value)
-{
-        return ((__u32)~0) - value;
-}
 #ifdef HAVE_PG_FS_MISC
 #define PageChecked(page)        test_bit(PG_fs_misc, &(page)->flags)
 #define SetPageChecked(page)     set_bit(PG_fs_misc, &(page)->flags)
@@ -261,21 +257,6 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash,
         return page;
 }
 
-/*
- * Chain of hash overflow pages.
- */
-struct ll_dir_chain {
-        /* XXX something. Later */
-};
-
-static void ll_dir_chain_init(struct ll_dir_chain *chain)
-{
-}
-
-static void ll_dir_chain_fini(struct ll_dir_chain *chain)
-{
-}
-
 static struct page *ll_get_dir_page(struct inode *dir, __u32 hash, int exact,
                                     struct ll_dir_chain *chain)
 {
diff --git a/lustre/lmv/lmv_internal.h b/lustre/lmv/lmv_internal.h
index c8b28b4e8b..4e8c721e9b 100644
--- a/lustre/lmv/lmv_internal.h
+++ b/lustre/lmv/lmv_internal.h
@@ -166,6 +166,10 @@ lmv_get_mea(struct ptlrpc_request *req, int offset)
 
 	if (mea->mea_count == 0)
 		return NULL;
+        if( mea->mea_magic != MEA_MAGIC_LAST_CHAR &&
+                mea->mea_magic != MEA_MAGIC_ALL_CHARS &&
+                mea->mea_magic != MEA_MAGIC_HASH_SEGMENT)
+                return NULL;
 	
 	return mea;
 }
diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c
index 0827ff49b0..c9db9251d0 100644
--- a/lustre/mdt/mdt_handler.c
+++ b/lustre/mdt/mdt_handler.c
@@ -1643,7 +1643,9 @@ static int mdt_enqueue(struct mdt_thread_info *info)
          * bits get corrupted somewhere in mdt_intent_policy().
          */
         req_bits = info->mti_dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
-        LASSERT(req_bits != 0);
+        /* This is disabled because we need to support liblustre flock.
+         * LASSERT(req_bits != 0);
+         */
 
         rc = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
                                   req, info->mti_dlm_req, &cbs);
diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c
index bc04b2be0b..28500c9c0f 100644
--- a/lustre/mdt/mdt_lib.c
+++ b/lustre/mdt/mdt_lib.c
@@ -1172,7 +1172,6 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
         if (rr->rr_name == NULL)
                 RETURN(-EFAULT);
         rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
-        LASSERT(rr->rr_namelen > 0);
 
         sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, &RMF_EADATA,
                                                      RCL_CLIENT);
diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c
index f5f6b8e429..05c83c707b 100644
--- a/lustre/mdt/mdt_open.c
+++ b/lustre/mdt/mdt_open.c
@@ -745,7 +745,6 @@ static int mdt_open_by_fid(struct mdt_thread_info* info,
         int                      rc;
         ENTRY;
 
-        LASSERT(info->mti_spec.u.sp_ea.no_lov_create);
         o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
         if (IS_ERR(o))
                 RETURN(rc = PTR_ERR(o));
@@ -758,7 +757,7 @@ static int mdt_open_by_fid(struct mdt_thread_info* info,
 
                 rc = mo_attr_get(env, mdt_object_child(o), ma);
                 if (rc == 0)
-                        rc = mdt_mfd_open(info, NULL, o, flags, 0);
+                        rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
         } else if (rc == 0) {
                 rc = -ENOENT;
         } else  {
@@ -872,13 +871,17 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                PFID(rr->rr_fid2), create_flags,
                ma->ma_attr.la_mode, lustre_msg_get_flags(req->rq_reqmsg));
 
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
-                /* This is a replay request. */
+        if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) ||
+            (req->rq_export->exp_libclient && create_flags&MDS_OPEN_HAS_EA)) {
+                /* This is a replay request or from liblustre with ea. */
                 result = mdt_open_by_fid(info, ldlm_rep);
 
-                if (result != -ENOENT)
+                if (result != -ENOENT) {
+                        if (req->rq_export->exp_libclient &&
+                            create_flags&MDS_OPEN_HAS_EA)
+                                GOTO(out, result = 0);
                         GOTO(out, result);
-
+                }
                 /*
                  * We didn't find the correct object, so we need to re-create it
                  * via a regular replay.
diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c
index 445e315fd2..0f9009195f 100644
--- a/lustre/obdfilter/filter.c
+++ b/lustre/obdfilter/filter.c
@@ -1388,6 +1388,8 @@ struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
 
         if (IS_ERR(dparent))
                 return dparent;
+        if (dparent == NULL)
+                return ERR_PTR(-ENOENT);
 
         rc = filter_lock_dentry(obd, dparent);
         fsfilt_check_slow(obd, now, obd_timeout, "parent lock");
@@ -2772,8 +2774,8 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa,
         dchild = filter_fid2dentry(obd, NULL, group, oa->o_id);
 
         if (IS_ERR(dchild)) {
-                CERROR("%s error looking up object: "LPU64"\n",
-                       what, oa->o_id);
+                CERROR("%s error looking up object: "LPU64":"LPU64"\n",
+                       what, group, oa->o_id);
                 RETURN(dchild);
         }
 
-- 
GitLab