diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h
index 98bb2d397373b8ec7c745f9c6e4d28a5d6370181..5384a68c014e0b6e6fa2922360d044b8bf71c1e7 100644
--- a/lustre/include/linux/obd_class.h
+++ b/lustre/include/linux/obd_class.h
@@ -1350,6 +1350,7 @@ void class_init_uuidlist(void);
 void class_exit_uuidlist(void);
 
 /* mea.c */
-int mea_name2idx(struct mea *mea, char *name, int namelen);
+int mea_name2idx(struct mea *, char *, int);
+int raw_name2idx(int, const char *, int);
 
 #endif /* __LINUX_OBD_CLASS_H */
diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c
index 37868ca32089b74fb18b65aa9917689591cad7e1..914a06a8282d7c09e0ffba414aeb4627e20412e3 100644
--- a/lustre/ldlm/ldlm_request.c
+++ b/lustre/ldlm/ldlm_request.c
@@ -354,7 +354,9 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                 }
 
                 if (reply->lock_desc.l_resource.lr_name.name[0] !=
-                    lock->l_resource->lr_name.name[0]) {
+                    lock->l_resource->lr_name.name[0] ||
+                   reply->lock_desc.l_resource.lr_name.name[1] !=
+                    lock->l_resource->lr_name.name[1]) {
                         CDEBUG(D_INFO, "remote intent success, locking %ld "
                                "instead of %ld\n",
                               (long)reply->lock_desc.l_resource.lr_name.name[0],
diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c
index 03b8f3b4433f13cfe8053f4809d160fe91d1c0c0..81e4baa1d1d63c4ce626231673d54c6c888a0a29 100644
--- a/lustre/lmv/lmv_intent.c
+++ b/lustre/lmv/lmv_intent.c
@@ -72,6 +72,12 @@ int lmv_handle_remote_inode(struct obd_export *exp, struct ll_uctxt *uctxt,
                 struct lustre_handle plock;
                 int pmode;
 
+                if (it->it_op == IT_LOOKUP) {
+                        /* unfortunately, we have to lie to MDC/MDS to
+                         * retrieve attributes llite needs */
+                        it->it_op = IT_GETATTR;
+                }
+
                 /* we got LOOKUP lock, but we really need attrs */
                 pmode = it->d.lustre.it_lock_mode;
                 if (pmode) {
@@ -171,10 +177,10 @@ int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt,
                 rc = lmv_revalidate_slaves(exp, reqp, cfid,
                                 it, 1, cb_blocking);
         } else if (S_ISDIR(body->mode)) {
-                CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n",
+                /*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n",
                                 (unsigned long) cfid->mds,
                                 (unsigned long) cfid->id,
-                                (unsigned long) cfid->generation);
+                                (unsigned long) cfid->generation);*/
         }
         lmv_put_obj(obj);
         RETURN(rc);
@@ -433,17 +439,25 @@ int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt,
          * cfid != NULL specifies revalidation */
 
         if (cfid) {
-                /* this is revalidation  during revalidation it's
-                 * enough to return 1 if we think attrs are uptodate
-                 * it may return updated attrs, though */
-                mds = cfid->mds;
+                /* this is revalidation: we have to check is LOOKUP
+                 * lock still valid for given fid. very important
+                 * part is that we have to choose right mds because
+                 * namespace is per mds */
+                rpfid = *pfid;
+                obj = lmv_grab_obj(obd, pfid, 0);
+                if (obj) {
+                        mds = raw_name2idx(obj->objcount, (char *) name, len);
+                        rpfid = obj->objs[mds].fid;
+                        lmv_put_obj(obj);
+                }
+                mds = rpfid.mds;
+                CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n",
+                       (unsigned long) cfid->mds,
+                       (unsigned long) cfid->id,
+                       (unsigned long) cfid->generation, mds);
                 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, pfid, name,
                                     len, lmm, lmmsize, cfid, it, flags,
                                     reqp, cb_blocking);
-                CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu = %d\n",
-                       (unsigned long) cfid->mds,
-                       (unsigned long) cfid->id,
-                       (unsigned long) cfid->generation, rc);
                 RETURN(rc);
         }
 
diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c
index 89bd1c20a9ff408cc32913e0d1e94de22befea7a..4932ea878e2d0ad8e31c5de9f22f274a1aa465ee 100644
--- a/lustre/lmv/lmv_obd.c
+++ b/lustre/lmv/lmv_obd.c
@@ -33,6 +33,7 @@
 #else
 #include <liblustre.h>
 #endif
+#include <linux/ext2_fs.h>
 
 #include <linux/obd_support.h>
 #include <linux/lustre_lib.h>
@@ -457,7 +458,8 @@ int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data,
         struct lmv_obd *lmv = &obd->u.lmv;
         struct mea *mea = op_data->mea1;
         struct mds_body *mds_body;
-        int rc, i, free_mea = 0;
+        int rc, i, mds, free_mea = 0;
+        struct lmv_obj *obj;
         ENTRY;
         lmv_connect(obd);
         /* TODO: where to create new directories?
@@ -465,17 +467,21 @@ int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data,
          * but we lookup by name may forward any request in slave
          */
 repeat:
-        i = mea_name2idx(mea, (char *) op_data->name, op_data->namelen);
-        if (mea)
-                op_data->fid1 = mea->mea_fids[i];
+        obj = lmv_grab_obj(obd, &op_data->fid1, 0);
+        if (obj) {
+                mds = raw_name2idx(obj->objcount, op_data->name,
+                                        op_data->namelen - 1);
+                op_data->fid1 = obj->objs[mds].fid;
+                lmv_put_obj(obj);
+        }
 
         CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu (mea 0x%p)\n",
                         op_data->namelen, op_data->name,
                         (unsigned long) op_data->fid1.mds,
                         (unsigned long) op_data->fid1.id,
                         (unsigned long) op_data->fid1.generation, mea);
-        rc = md_create(lmv->tgts[i].exp, op_data, data, datalen,
-                            mode, uid, gid, rdev, request);
+        rc = md_create(lmv->tgts[op_data->fid1.mds].exp, op_data, data,
+                       datalen, mode, uid, gid, rdev, request);
         if (rc == 0) {
                 if (*request == NULL)
                      RETURN(rc);
@@ -484,13 +490,16 @@ repeat:
                 LASSERT(mds_body != NULL);
                 CDEBUG(D_OTHER, "created. id = %lu, generation = %lu, mds = %d\n",
                        (unsigned long) mds_body->fid1.id,
-                       (unsigned long) mds_body->fid1.generation, i);
-                LASSERT(mds_body->mds == i);
+                       (unsigned long) mds_body->fid1.generation,
+                       op_data->fid1.mds);
+                LASSERT(mds_body->valid & OBD_MD_MDS ||
+                                mds_body->mds == op_data->fid1.mds);
         } else if (rc == -ESTALE) {
                 struct ptlrpc_request *req = NULL;
                 struct lustre_md md;
                 int mealen;
                 
+                LBUG(); /* FIXME ASAP */
                 CDEBUG(D_OTHER, "it seems MDS splitted dir\n");
                 LASSERT(mea == NULL);
 
@@ -597,30 +606,33 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        struct mea *mea = data->mea2;
-        int rc, i;
+        struct lmv_obj *obj;
+        int rc;
         ENTRY;
         lmv_connect(obd);
         if (data->namelen != 0) {
                 /* usual link request */
-                i = mea_name2idx(mea, (char *) data->name, data->namelen);
-                if (mea)
-                        data->fid2 = mea->mea_fids[i];
-                CDEBUG(D_OTHER,"link %u/%u/%u:%*s to %u/%u/%u mds %d mea %p\n",
+                obj = lmv_grab_obj(obd, &data->fid1, 0);
+                if (obj) {
+                        rc = raw_name2idx(obj->objcount, data->name,
+                                         data->namelen);
+                        data->fid1 = obj->objs[rc].fid;
+                        lmv_put_obj(obj);
+                }
+                CDEBUG(D_OTHER,"link %u/%u/%u:%*s to %u/%u/%u mds %d\n",
                        (unsigned) data->fid2.mds, (unsigned) data->fid2.id,
                        (unsigned) data->fid2.generation, data->namelen,
                        data->name, (unsigned) data->fid1.mds,
                        (unsigned) data->fid1.id,
-                       (unsigned) data->fid1.generation, i, mea);
+                       (unsigned) data->fid1.generation, data->fid1.mds);
         } else {
                 /* request from MDS to acquire i_links for inode by fid1 */
-                i = data->fid1.mds;
                 CDEBUG(D_OTHER, "inc i_nlinks for %u/%u/%u\n",
                        (unsigned) data->fid1.mds, (unsigned) data->fid1.id,
                        (unsigned) data->fid1.generation);
         }
                         
-        rc = md_link(lmv->tgts[i].exp, data, request);
+        rc = md_link(lmv->tgts[data->fid1.mds].exp, data, request);
         RETURN(rc);
 }
 
@@ -791,6 +803,23 @@ int lmv_dirobj_blocking_ast(struct ldlm_lock *lock,
         RETURN(0);
 }
 
+void lmv_remove_dots(struct page *page)
+{
+        char *kaddr = page_address(page);
+        unsigned limit = PAGE_CACHE_SIZE;
+        unsigned offs, rec_len;
+        struct ext2_dir_entry_2 *p;
+
+        for (offs = 0; offs <= limit - EXT2_DIR_REC_LEN(1); offs += rec_len) {
+                p = (struct ext2_dir_entry_2 *)(kaddr + offs);
+                rec_len = le16_to_cpu(p->rec_len);
+
+                if ((p->name_len == 1 && p->name[0] == '.') ||
+                    (p->name_len == 2 && p->name[0] == '.' && p->name[1] == '.'))
+                        p->inode = 0;
+        }
+}
+
 int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid,
                  __u64 offset, struct page *page,
                  struct ptlrpc_request **request)
@@ -826,11 +855,15 @@ int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid,
                        (unsigned long) offset);
         }
         rc = md_readpage(lmv->tgts[rfid.mds].exp, &rfid, offset, page, request);
+        if (rc == 0 && !fid_equal(&rfid, mdc_fid)) {
+                /* this page isn't from master object. to avoid
+                 * ./.. duplication in directory, we have to remove them
+                 * from all slave objects */
+                lmv_remove_dots(page);
+        }
       
         lmv_put_obj(obj);
 
-#warning "we need fix for duplicate . and .. from slaves"
-
         RETURN(rc);
 }
 
@@ -839,27 +872,30 @@ int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        struct mea *mea = data->mea1;
         int rc, i = 0;
         ENTRY;
         lmv_connect(obd);
         if (data->namelen != 0) {
-                i = mea_name2idx(mea, (char *) data->name, data->namelen);
-                if (mea)
-                        data->fid1 = mea->mea_fids[i];
+                struct lmv_obj *obj;
+                obj = lmv_grab_obj(obd, &data->fid1, 0);
+                if (obj) {
+                        i = raw_name2idx(obj->objcount, data->name,
+                                         data->namelen);
+                        data->fid1 = obj->objs[i].fid;
+                        lmv_put_obj(obj);
+                }
                 CDEBUG(D_OTHER, "unlink '%*s' in %lu/%lu/%lu -> %u\n",
                        data->namelen, data->name,
                        (unsigned long) data->fid1.mds,
                        (unsigned long) data->fid1.id,
                        (unsigned long) data->fid1.generation, i);
         } else {
-                i = data->fid1.mds;
                 CDEBUG(D_OTHER, "drop i_nlink on %lu/%lu/%lu\n",
                        (unsigned long) data->fid1.mds,
                        (unsigned long) data->fid1.id,
                        (unsigned long) data->fid1.generation);
         }
-        rc = md_unlink(lmv->tgts[i].exp, data, request); 
+        rc = md_unlink(lmv->tgts[data->fid1.mds].exp, data, request); 
         RETURN(rc);
 }
 
@@ -902,6 +938,26 @@ int lmv_init_ea_size(struct obd_export *exp, int easize, int cookiesize)
         RETURN(rc);
 }
 
+int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
+                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd *lmv = &obd->u.lmv;
+        struct lov_stripe_md obj_md;
+        struct lov_stripe_md *obj_mdp = &obj_md;
+        int rc = 0;
+        ENTRY;
+        lmv_connect(obd);
+
+        LASSERT(ea == NULL);
+        LASSERT(oa->o_mds < lmv->count);
+
+        rc = obd_create(lmv->tgts[oa->o_mds].exp, oa, &obj_mdp, oti);
+        LASSERT(rc == 0);
+
+        RETURN(rc);
+}
+
 /*
  * to be called from MDS only
  */
@@ -916,8 +972,12 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
         ENTRY;
         lmv_connect(obd);
 
-        LASSERT(ea != NULL);
         LASSERT(oa != NULL);
+        
+        if (ea == NULL) {
+                rc = lmv_obd_create_single(exp, oa, NULL, oti);
+                RETURN(rc);
+        }
 
         if (*ea == NULL) {
                 rc = obd_alloc_diskmd(exp, (struct lov_mds_md **) ea);
diff --git a/lustre/lvfs/fsfilt_ext3.c b/lustre/lvfs/fsfilt_ext3.c
index b154e8ae9e2f4beb0ea9cab07357da0c7fe4ec52..46eb191bf9c11dc127f5f448da6ca023ad0c3ccc 100644
--- a/lustre/lvfs/fsfilt_ext3.c
+++ b/lustre/lvfs/fsfilt_ext3.c
@@ -925,7 +925,7 @@ static int fsfilt_ext3_add_dir_entry(struct obd_device *obd,
         
         l_dput(dentry);
 
-        return err;
+        RETURN(err);
 #else
 #error "rebuild kernel and lustre with ext3-mds-num patch!"
         LASSERT(0);
diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c
index fd7846d7723593c3073948dcf3125955b22c4e4c..9d9e64edcfac0cbca363b25839ef0cea99efb9e5 100644
--- a/lustre/mds/handler.c
+++ b/lustre/mds/handler.c
@@ -723,9 +723,6 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
         return(rc);
 }
 
-#define DENTRY_VALID(dentry)    \
-        ((dentry)->d_inode || ((dentry)->d_flags & DCACHE_CROSS_REF))
-
 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
                             struct lustre_handle *child_lockh, int child_part)
 {
@@ -1182,9 +1179,9 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         char fidname[LL_FID_NAMELEN];
         struct inode *parent_inode;
         struct obd_run_ctxt saved;
-        struct dentry *new_child;
         int err, namelen, mealen;
         struct obd_ucred uc;
+        struct dentry *new;
         struct mea *mea;
         void *handle;
         ENTRY;
@@ -1204,14 +1201,50 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         if (rc)
                 RETURN(rc);
 
+        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
+
+        if (!(body->oa.o_valid & OBD_MD_FLID)) {
+                /* this is request from another MDS to create remove dir inode */
+                unsigned int tmpname = ll_insecure_random_int();
+
+                handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
+                LASSERT(!IS_ERR(handle));
+
+                sprintf(fidname, "%u", tmpname);
+                new = simple_mkdir(mds->mds_objects_dir, fidname,
+                                        body->oa.o_mode, 1);
+                LASSERT(!IS_ERR(new));
+                LASSERT(new->d_inode != NULL);
+
+                obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
+                repbody->oa.o_id = new->d_inode->i_ino;
+                repbody->oa.o_generation = new->d_inode->i_generation;
+                repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
+
+                rc = fsfilt_del_dir_entry(obd, new);
+                LASSERT(rc == 0);
+
+                rc = fsfilt_commit(obd, parent_inode, handle, 0);
+                LASSERT(rc == 0);
+
+                CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
+                       (unsigned long) new->d_inode->i_ino,
+                       (unsigned long) new->d_inode->i_generation,
+                       (unsigned) new->d_inode->i_mode);
+
+                l_dput(new);
+                pop_ctxt(&saved, &obd->obd_ctxt, &uc);
+                RETURN(0);
+        }
+
         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
        
         namelen = ll_fid2str(fidname, body->oa.o_id, body->oa.o_generation);
         
         down(&parent_inode->i_sem);
-        new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
-        if (new_child->d_inode != NULL) {
+        new = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
+        if (new->d_inode != NULL) {
                 CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
                        repbody->oa.o_id, repbody->oa.o_generation);
                 LBUG();
@@ -1221,7 +1254,7 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         /* FIXME: error handling here */
         LASSERT(!IS_ERR(handle));
 
-        rc = vfs_mkdir(parent_inode, new_child, body->oa.o_mode);
+        rc = vfs_mkdir(parent_inode, new, body->oa.o_mode);
         up(&parent_inode->i_sem);
         /* FIXME: error handling here */
         if (rc)
@@ -1233,14 +1266,14 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         OBD_ALLOC(mea, mealen);
         LASSERT(mea != NULL);
         mea->mea_count = 0;
-        down(&new_child->d_inode->i_sem);
-        handle = fsfilt_start(obd, new_child->d_inode, FSFILT_OP_SETATTR, NULL);
+        down(&new->d_inode->i_sem);
+        handle = fsfilt_start(obd, new->d_inode, FSFILT_OP_SETATTR, NULL);
         LASSERT(!IS_ERR(handle));
-	rc = fsfilt_set_md(obd, new_child->d_inode, handle, mea, mealen);
+	rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
         LASSERT(rc == 0);
-        fsfilt_commit(obd, new_child->d_inode, handle, 0);
+        fsfilt_commit(obd, new->d_inode, handle, 0);
         LASSERT(rc == 0);
-	up(&new_child->d_inode->i_sem);
+	up(&new->d_inode->i_sem);
         OBD_FREE(mea, mealen);
 
         err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
@@ -1248,16 +1281,16 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         /* FIXME: error handling here */
         LASSERT(err == 0);
 
-        obdo_from_inode(&repbody->oa, new_child->d_inode, FILTER_VALID_FLAGS);
-        repbody->oa.o_id = new_child->d_inode->i_ino;
-        repbody->oa.o_generation = new_child->d_inode->i_generation;
+        obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
+        repbody->oa.o_id = new->d_inode->i_ino;
+        repbody->oa.o_generation = new->d_inode->i_generation;
         CDEBUG(D_OTHER, "created dirobj: %lu, %lu mode %o, uid %u, gid %u\n",
                         (unsigned long) repbody->oa.o_id,
-                        (unsigned long) new_child->d_inode->i_ino,
-                        (unsigned) new_child->d_inode->i_mode,
-                        (unsigned) new_child->d_inode->i_uid,
-                        (unsigned) new_child->d_inode->i_gid);
-        dput(new_child);
+                        (unsigned long) new->d_inode->i_ino,
+                        (unsigned) new->d_inode->i_mode,
+                        (unsigned) new->d_inode->i_uid,
+                        (unsigned) new->d_inode->i_gid);
+        dput(new);
         pop_ctxt(&saved, &obd->obd_ctxt, &uc);
         RETURN(0);
 }
@@ -1322,6 +1355,7 @@ static int mds_set_info(struct obd_export *exp, __u32 keylen,
 {
         struct obd_device *obd;
         struct mds_obd *mds;
+        int rc;
         ENTRY;
 
         obd = class_exp2obd(exp);
@@ -1342,6 +1376,8 @@ static int mds_set_info(struct obd_export *exp, __u32 keylen,
                                         atomic_read(&mds->mds_real_clients));
                         exp->exp_flags |= OBD_OPT_REAL_CLIENT;
                 }
+                rc = mds_lmv_connect(obd, mds->mds_lmv_name);
+                LASSERT(rc == 0);
                 RETURN(0);
         }
 
diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h
index cb41877a783d6012ca470a10b776712311918ac0..c07a5c6855854fcc036efc58ac7a155f734507b7 100644
--- a/lustre/mds/mds_internal.h
+++ b/lustre/mds/mds_internal.h
@@ -12,6 +12,8 @@ struct mds_filter_data {
 };
 
 #define MDS_FILTERDATA(inode) ((struct mds_filter_data *)(inode)->i_filterdata)
+#define DENTRY_VALID(dentry)    \
+        ((dentry)->d_inode || ((dentry)->d_flags & DCACHE_CROSS_REF))
 
 static inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
 {
@@ -116,5 +118,6 @@ int mds_lmv_disconnect(struct obd_device *obd, int flags);
 int mds_try_to_split_dir(struct obd_device *, struct dentry *, struct mea **,
                          int);
 int mds_get_lmv_attr(struct obd_device *, struct inode *, struct mea **, int *);
+int mds_choose_mdsnum(struct obd_device *, const char *, int);
 
 #endif /* _MDS_INTERNAL_H */
diff --git a/lustre/mds/mds_lib.c b/lustre/mds/mds_lib.c
index a40717685d2aafa122e92c8abd265f425a46ccbe..327fd70a3b24bf4eb7952a797ee753fa9465fe78 100644
--- a/lustre/mds/mds_lib.c
+++ b/lustre/mds/mds_lib.c
@@ -83,7 +83,13 @@ void mds_pack_inode2body(struct obd_device *obd, struct mds_body *b,
         b->flags = inode->i_flags;
         b->rdev = inode->i_rdev;
         /* Return the correct link count for orphan inodes */
-        b->nlink = mds_inode_is_orphan(inode) ? 0 : inode->i_nlink;
+        if (mds_inode_is_orphan(inode)) {
+                b->nlink = 0;
+        } else if (S_ISDIR(inode->i_mode)) {
+                b->nlink = 1;
+        } else {
+                b->nlink = inode->i_nlink;
+        }
         b->generation = inode->i_generation;
         b->suppgid = -1;
         b->mds = obd->u.mds.mds_num;
diff --git a/lustre/mds/mds_lmv.c b/lustre/mds/mds_lmv.c
index 8afb801e5f3a220de606a61f879a756c7291a358..3a5c838b15f133a6a2dcfb2c46ff1a11af211151 100644
--- a/lustre/mds/mds_lmv.c
+++ b/lustre/mds/mds_lmv.c
@@ -117,7 +117,8 @@ int mds_lmv_postsetup(struct obd_device *obd)
         struct mds_obd *mds = &obd->u.mds;
         ENTRY;
         if (mds->mds_lmv_exp)
-                obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize, 0);
+                obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize,
+                                 mds->mds_max_cookiesize);
         RETURN(0);
 }
 
@@ -396,7 +397,7 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
         return 0;
 }
 
-#define MAX_DIR_SIZE    (32 * 1024)
+#define MAX_DIR_SIZE    (64 * 1024)
 
 /*
  * must not be called on already splitted directories
@@ -404,15 +405,11 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
 int mds_try_to_split_dir(struct obd_device *obd,
                          struct dentry *dentry, struct mea **mea, int nstripes)
 {
-        ldlm_policy_data_t policy = { .l_inodebits = {MDS_INODELOCK_UPDATE}};
-        struct ldlm_res_id res_id = { .name = {0} };
         struct inode *dir = dentry->d_inode;
         struct mds_obd *mds = &obd->u.mds;
-        struct lustre_handle lockh;
         struct mea *tmea = NULL;
         struct obdo *oa = NULL;
-	int rc, flags = 0;
-	int mea_size = 0;
+	int rc, mea_size = 0;
 	void *handle;
 	ENTRY;
 
@@ -424,7 +421,7 @@ int mds_try_to_split_dir(struct obd_device *obd,
         if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
                 RETURN(0);
 
-#if 0
+#if 1
         if (dir->i_size < MAX_DIR_SIZE)
                 RETURN(0);
 #endif
@@ -445,23 +442,13 @@ int mds_try_to_split_dir(struct obd_device *obd,
            necessary amount of stripes, but on the other hand with this approach
            of allocating maximal possible amount of MDS slots, it would be
            easier to split the dir over more MDSes */
-        rc = obd_alloc_diskmd(mds->mds_lmv_exp, mea);
+        rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *) mea);
         if (!(*mea))
                 RETURN(-ENOMEM);
         (*mea)->mea_count = nstripes;
+       
+#warning "we have to take EX lock on a dir for splitting"
         
-        /* convert lock on the dir in order tox
-         * invalidate client's attributes -bzzz */
-        res_id.name[0] = dir->i_ino;
-        res_id.name[1] = dir->i_generation;
-        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
-                              LDLM_IBITS, &policy, LCK_PW, &flags,
-                              mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
-                              NULL, 0, NULL, &lockh);
-        if (rc != ELDLM_OK) {
-                CERROR("error: rc = %d\n", rc);
-        }
-
 	/* 1) create directory objects on slave MDS'es */
 	/* FIXME: should this be OBD method? */
         oa = obdo_alloc();
@@ -496,8 +483,6 @@ int mds_try_to_split_dir(struct obd_device *obd,
 	up(&dir->i_sem);
 	obdo_free(oa);
 
-        ldlm_lock_decref(&lockh, LCK_PW);
-
 	/* 3) read through the dir and distribute it over objects */
         scan_and_distribute(obd, dentry, *mea);
 
@@ -641,3 +626,13 @@ int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
+int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct lmv_obd *lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
+        int i;
+
+        i = raw_name2idx(lmv->count, name, len);
+        RETURN(i);
+}
+
diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c
index 9f7dd3a691038f77136ba5da72d3225d573e3b3a..d3615d416f1cc9d2ef0122f085a97a7430b338fe 100644
--- a/lustre/mds/mds_reint.c
+++ b/lustre/mds/mds_reint.c
@@ -625,24 +625,74 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         }
         case S_IFDIR:{
                 int nstripes = 0;
-                handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
-                if (IS_ERR(handle))
-                        GOTO(cleanup, rc = PTR_ERR(handle));
+                int i;
+                
+                /* as Peter asked, mkdir() should distribute new directories
+                 * over the whole cluster in order to distribute namespace
+                 * processing load. first, we calculate which MDS to use to
+                 * put new directory's inode in */
+                i = mds_choose_mdsnum(obd, rec->ur_name, rec->ur_namelen - 1);
+                if (i == mds->mds_num) {
+                        /* inode will be created locally */
 
-                rc = vfs_mkdir(dir, dchild, rec->ur_mode);
+                        handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
+                        if (IS_ERR(handle))
+                                GOTO(cleanup, rc = PTR_ERR(handle));
 
-                if (rec->ur_eadata)
-                        nstripes = *(u16 *)rec->ur_eadata;
+                        rc = vfs_mkdir(dir, dchild, rec->ur_mode);
+
+                        if (rec->ur_eadata)
+                                nstripes = *(u16 *)rec->ur_eadata;
 
 #if 1
-                /* this is for current testing yet. after the testing
-                 * directory will split if size reaches some limite -bzzz */
-                if (rc == 0) {
+                        /* this is for current testing yet. after the testing
+                         * directory will split if size reaches some limite -bzzz */
+                        if (rc == 0) {
 #else
-                if (rc == 0 && nstripes) {
+                        if (rc == 0 && nstripes) {
 #endif
-                        /* FIXME: error handling here */
-                        mds_try_to_split_dir(obd, dchild, NULL, nstripes);
+                                /* FIXME: error handling here */
+                                mds_try_to_split_dir(obd, dchild, NULL, nstripes);
+                        }
+                } else if (!DENTRY_VALID(dchild)) {
+                        /* inode will be created on another MDS */
+                        struct obdo *oa = NULL;
+                        struct mds_body *body;
+                        
+                        /* first, create that inode */
+                        oa = obdo_alloc();
+                        LASSERT(oa != NULL);
+                        oa->o_mds = i;
+                        obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
+                                        OBD_MD_FLMTIME | OBD_MD_FLCTIME |
+                                        OBD_MD_FLUID | OBD_MD_FLGID);
+                        oa->o_mode = dir->i_mode;
+                        CDEBUG(D_OTHER, "%s: create dir on MDS %u\n",
+                                        obd->obd_name, i);
+                        rc = obd_create(mds->mds_lmv_exp, oa, NULL, NULL);
+	                LASSERT(rc == 0);
+                        
+                        /* now, add new dir entry for it */
+                        handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
+                        if (IS_ERR(handle))
+                                GOTO(cleanup, rc = PTR_ERR(handle));
+                        rc = fsfilt_add_dir_entry(obd, dparent, rec->ur_name,
+                                                  rec->ur_namelen - 1,
+                                                  oa->o_id, oa->o_generation,
+                                                  i);
+                        LASSERT(rc == 0);
+
+                        /* fill reply */
+                        body = lustre_msg_buf(req->rq_repmsg,
+                                              offset, sizeof (*body));
+                        body->valid |= OBD_MD_FLID | OBD_MD_MDS;
+                        body->fid1.id = oa->o_id;
+                        body->fid1.mds = i;
+                        body->fid1.generation = oa->o_generation;
+	                obdo_free(oa);
+                } else {
+                        /* requested name exists in the directory */
+                        rc = -EEXIST;
                 }
                 EXIT;
                 break;
@@ -683,7 +733,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         if (rc) {
                 CDEBUG(D_INODE, "error during create: %d\n", rc);
                 GOTO(cleanup, rc);
-        } else {
+        } else if (dchild->d_inode) {
                 struct iattr iattr;
                 struct inode *inode = dchild->d_inode;
                 struct mds_body *body;
@@ -1249,7 +1299,6 @@ int mds_create_local_dentry(struct mds_update_record *rec,
         char *fidname = rec->ur_name;
         struct dentry *child = NULL;
         struct lustre_handle lockh;
-        unsigned mode;
         void *handle;
         ENTRY;
 
@@ -1301,8 +1350,13 @@ int mds_create_local_dentry(struct mds_update_record *rec,
                 CERROR("error linking orphan %lu/%lu to FIDS: rc = %d\n",
                        (unsigned long) child->d_inode->i_ino,
                        (unsigned long) child->d_inode->i_generation, rc);
-        else
+        else {
+                if (S_ISDIR(child->d_inode->i_mode)) {
+                        fids_dir->i_nlink++;
+                        mark_inode_dirty(fids_dir);
+                }
                 mark_inode_dirty(child->d_inode);
+        }
         fsfilt_commit(obd, fids_dir, handle, 0);
 
         rec->ur_fid1->id = fids_dir->i_ino;
@@ -1325,7 +1379,6 @@ cleanup:
 static int mds_copy_unlink_reply(struct ptlrpc_request *master,
                                         struct ptlrpc_request *slave)
 {
-        struct lov_mds_md *eadata;
         void *cookie, *cookie2;
         struct mds_body *body2;
         struct mds_body *body;
@@ -1339,7 +1392,6 @@ static int mds_copy_unlink_reply(struct ptlrpc_request *master,
         LASSERT(body2 != NULL);
 
         if (!(body->valid & (OBD_MD_FLID | OBD_MD_FLGENER))) {
-                CWARN("empty reply\n");
                 RETURN(0);
         }
 
@@ -1395,8 +1447,6 @@ static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset,
 
         LASSERT(offset == 0 || offset == 2);
 
-        DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s",
-                  rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name);
         DEBUG_REQ(D_INODE, req, "unlink %*s (remote inode %u/%u/%u)\n",
                   rec->ur_namelen - 1, rec->ur_name, (unsigned)dchild->d_mdsnum,
                   (unsigned) dchild->d_inum, (unsigned) dchild->d_generation);
@@ -1480,7 +1530,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 LASSERT(unlink_by_fid == 0);
                 LASSERT(dchild->d_mdsnum != mds->mds_num);
                 mds_reint_unlink_remote(rec, offset, req, parent_lockh,
-                                        dparent, &child_lockh, dchild);
+                                             dparent, &child_lockh, dchild);
                 RETURN(0);
         }
 
@@ -2143,14 +2193,18 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
         cleanup_phase = 3; /* original name dentry */
 
         inode = (*de_oldp)->d_inode;
-        if (inode != NULL)
+        if (inode != NULL) {
                 inode = igrab(inode);
-        if (inode == NULL)
-                GOTO(cleanup, rc = -ENOENT);
+                if (inode == NULL)
+                        GOTO(cleanup, rc = -ENOENT);
 
-        c1_res_id.name[0] = inode->i_ino;
-        c1_res_id.name[1] = inode->i_generation;
-        iput(inode);
+                c1_res_id.name[0] = inode->i_ino;
+                c1_res_id.name[1] = inode->i_generation;
+                iput(inode);
+        } else if ((*de_oldp)->d_flags & DCACHE_CROSS_REF) {
+                c1_res_id.name[0] = (*de_oldp)->d_inum;
+                c1_res_id.name[1] = (*de_oldp)->d_generation;
+        }
 
         /* Step 4: Lookup the target child entry */
         *de_newp = ll_lookup_one_len(new_name, *de_tgtdirp, new_len - 1);
@@ -2164,15 +2218,18 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
         cleanup_phase = 4; /* target dentry */
 
         inode = (*de_newp)->d_inode;
-        if (inode != NULL)
+        if (inode != NULL) {
                 inode = igrab(inode);
-        if (inode == NULL)
-                goto retry_locks;
-
-        c2_res_id.name[0] = inode->i_ino;
-        c2_res_id.name[1] = inode->i_generation;
+                if (inode == NULL)
+                        goto retry_locks;
 
-        iput(inode);
+                c2_res_id.name[0] = inode->i_ino;
+                c2_res_id.name[1] = inode->i_generation;
+                iput(inode);
+        } else if ((*de_newp)->d_flags & DCACHE_CROSS_REF) {
+                c2_res_id.name[0] = (*de_newp)->d_inum;
+                c2_res_id.name[1] = (*de_newp)->d_generation;
+        }
 
 retry_locks:
         /* Step 5: Take locks on the parents and child(ren) */
@@ -2213,7 +2270,7 @@ retry_locks:
                 GOTO(cleanup, rc);
         }
 
-        if ((*de_oldp)->d_inode == NULL)
+        if (!DENTRY_VALID(*de_oldp))
                 GOTO(cleanup, rc = -ENOENT);
 
         /* Step 6b: Re-lookup target child to verify it hasn't changed */
diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c
index b967b92e396ab2b6b5c7d0fe9b1f144461dc353c..20f9d943f4d678c86cee42037decd2008c061e1c 100644
--- a/lustre/obdclass/class_obd.c
+++ b/lustre/obdclass/class_obd.c
@@ -439,7 +439,6 @@ EXPORT_SYMBOL(class_detach);
 
 /* mea.c */
 EXPORT_SYMBOL(mea_name2idx);
-int raw_name2idx(int count, char *name, int namelen);
 EXPORT_SYMBOL(raw_name2idx);
 
 #ifdef LPROCFS
diff --git a/lustre/obdclass/mea.c b/lustre/obdclass/mea.c
index ceba64ee1d783916bd19a62d56992ce6cd34d0c2..56fc149ee55a49a49c93864b8c150606a00aa987 100644
--- a/lustre/obdclass/mea.c
+++ b/lustre/obdclass/mea.c
@@ -54,7 +54,7 @@ int mea_name2idx(struct mea *mea, char *name, int namelen)
         return c;
 }
 
-int raw_name2idx(int count, char *name, int namelen)
+int raw_name2idx(int count, const char *name, int namelen)
 {
         unsigned int c;