From e2d42b61949a4ffc08a3a8245f0572543350bd2c Mon Sep 17 00:00:00 2001 From: alex <alex> Date: Wed, 19 May 2004 09:00:24 +0000 Subject: [PATCH] - raw_name2idx declaration to avoid warnings - ldlm_cli_enqueue() takes 2nd number from resource name into account to recognize that returned lock changed. we need this because MDS may return LOOKUP lock with the same ino, but another generation. the case is very simple mkdir a; then touch a/b; remount; lookup a/b; a and b my live on different MDSs and have the same ino - lmv_handle_remote_inode() changes intent from IT_LOOKUP to IT_GETATTR: caller expect to find attributes in reply - lmv_intent_lookup() chooses right MDS to revalidate the name - lmv_create() chooses right MDS where to forward request to - lmv_link() chooses right mds where to forward request to - lmv_unlink() chooses right mds where to forward request to - lmv_readpage() removes . and .. from all the pages, but ones that come from master MDS for given directory - lmv_obd_create_single() requests creation of single directory on given MDS - mdt_obj_create() creates a directory for new mkdir() semantic - each time new llite connects to MDS, the latter try to connect to own LMV - mds_pack_inode2body() returns nlink=1 for directories - mds_lmv_postsetup() pass valid cookiesize to lmv and down to the stack - mds_reint_create() distributes new dir's inodes over the cluster - bug in mds_create_local_dentry() fixed: rmdir may cause FIDS/ removal - mds_get_parents_childer_lock() recognizes cross-ref dentries --- lustre/include/linux/obd_class.h | 3 +- lustre/ldlm/ldlm_request.c | 4 +- lustre/lmv/lmv_intent.c | 34 ++++++--- lustre/lmv/lmv_obd.c | 112 ++++++++++++++++++++------- lustre/lvfs/fsfilt_ext3.c | 2 +- lustre/mds/handler.c | 76 ++++++++++++++----- lustre/mds/mds_internal.h | 3 + lustre/mds/mds_lib.c | 8 +- lustre/mds/mds_lmv.c | 41 +++++----- lustre/mds/mds_reint.c | 125 ++++++++++++++++++++++--------- lustre/obdclass/class_obd.c | 1 - lustre/obdclass/mea.c | 2 +- 12 files changed, 292 insertions(+), 119 deletions(-) diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 98bb2d3973..5384a68c01 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -1350,6 +1350,7 @@ void class_init_uuidlist(void); void class_exit_uuidlist(void); /* mea.c */ -int mea_name2idx(struct mea *mea, char *name, int namelen); +int mea_name2idx(struct mea *, char *, int); +int raw_name2idx(int, const char *, int); #endif /* __LINUX_OBD_CLASS_H */ diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 37868ca320..914a06a828 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -354,7 +354,9 @@ int ldlm_cli_enqueue(struct obd_export *exp, } if (reply->lock_desc.l_resource.lr_name.name[0] != - lock->l_resource->lr_name.name[0]) { + lock->l_resource->lr_name.name[0] || + reply->lock_desc.l_resource.lr_name.name[1] != + lock->l_resource->lr_name.name[1]) { CDEBUG(D_INFO, "remote intent success, locking %ld " "instead of %ld\n", (long)reply->lock_desc.l_resource.lr_name.name[0], diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index 03b8f3b443..81e4baa1d1 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -72,6 +72,12 @@ int lmv_handle_remote_inode(struct obd_export *exp, struct ll_uctxt *uctxt, struct lustre_handle plock; int pmode; + if (it->it_op == IT_LOOKUP) { + /* unfortunately, we have to lie to MDC/MDS to + * retrieve attributes llite needs */ + it->it_op = IT_GETATTR; + } + /* we got LOOKUP lock, but we really need attrs */ pmode = it->d.lustre.it_lock_mode; if (pmode) { @@ -171,10 +177,10 @@ int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt, rc = lmv_revalidate_slaves(exp, reqp, cfid, it, 1, cb_blocking); } else if (S_ISDIR(body->mode)) { - CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n", + /*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n", (unsigned long) cfid->mds, (unsigned long) cfid->id, - (unsigned long) cfid->generation); + (unsigned long) cfid->generation);*/ } lmv_put_obj(obj); RETURN(rc); @@ -433,17 +439,25 @@ int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt, * cfid != NULL specifies revalidation */ if (cfid) { - /* this is revalidation during revalidation it's - * enough to return 1 if we think attrs are uptodate - * it may return updated attrs, though */ - mds = cfid->mds; + /* this is revalidation: we have to check is LOOKUP + * lock still valid for given fid. very important + * part is that we have to choose right mds because + * namespace is per mds */ + rpfid = *pfid; + obj = lmv_grab_obj(obd, pfid, 0); + if (obj) { + mds = raw_name2idx(obj->objcount, (char *) name, len); + rpfid = obj->objs[mds].fid; + lmv_put_obj(obj); + } + mds = rpfid.mds; + CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n", + (unsigned long) cfid->mds, + (unsigned long) cfid->id, + (unsigned long) cfid->generation, mds); rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, pfid, name, len, lmm, lmmsize, cfid, it, flags, reqp, cb_blocking); - CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu = %d\n", - (unsigned long) cfid->mds, - (unsigned long) cfid->id, - (unsigned long) cfid->generation, rc); RETURN(rc); } diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 89bd1c20a9..4932ea878e 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -33,6 +33,7 @@ #else #include <liblustre.h> #endif +#include <linux/ext2_fs.h> #include <linux/obd_support.h> #include <linux/lustre_lib.h> @@ -457,7 +458,8 @@ int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data, struct lmv_obd *lmv = &obd->u.lmv; struct mea *mea = op_data->mea1; struct mds_body *mds_body; - int rc, i, free_mea = 0; + int rc, i, mds, free_mea = 0; + struct lmv_obj *obj; ENTRY; lmv_connect(obd); /* TODO: where to create new directories? @@ -465,17 +467,21 @@ int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data, * but we lookup by name may forward any request in slave */ repeat: - i = mea_name2idx(mea, (char *) op_data->name, op_data->namelen); - if (mea) - op_data->fid1 = mea->mea_fids[i]; + obj = lmv_grab_obj(obd, &op_data->fid1, 0); + if (obj) { + mds = raw_name2idx(obj->objcount, op_data->name, + op_data->namelen - 1); + op_data->fid1 = obj->objs[mds].fid; + lmv_put_obj(obj); + } CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu (mea 0x%p)\n", op_data->namelen, op_data->name, (unsigned long) op_data->fid1.mds, (unsigned long) op_data->fid1.id, (unsigned long) op_data->fid1.generation, mea); - rc = md_create(lmv->tgts[i].exp, op_data, data, datalen, - mode, uid, gid, rdev, request); + rc = md_create(lmv->tgts[op_data->fid1.mds].exp, op_data, data, + datalen, mode, uid, gid, rdev, request); if (rc == 0) { if (*request == NULL) RETURN(rc); @@ -484,13 +490,16 @@ repeat: LASSERT(mds_body != NULL); CDEBUG(D_OTHER, "created. id = %lu, generation = %lu, mds = %d\n", (unsigned long) mds_body->fid1.id, - (unsigned long) mds_body->fid1.generation, i); - LASSERT(mds_body->mds == i); + (unsigned long) mds_body->fid1.generation, + op_data->fid1.mds); + LASSERT(mds_body->valid & OBD_MD_MDS || + mds_body->mds == op_data->fid1.mds); } else if (rc == -ESTALE) { struct ptlrpc_request *req = NULL; struct lustre_md md; int mealen; + LBUG(); /* FIXME ASAP */ CDEBUG(D_OTHER, "it seems MDS splitted dir\n"); LASSERT(mea == NULL); @@ -597,30 +606,33 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct mea *mea = data->mea2; - int rc, i; + struct lmv_obj *obj; + int rc; ENTRY; lmv_connect(obd); if (data->namelen != 0) { /* usual link request */ - i = mea_name2idx(mea, (char *) data->name, data->namelen); - if (mea) - data->fid2 = mea->mea_fids[i]; - CDEBUG(D_OTHER,"link %u/%u/%u:%*s to %u/%u/%u mds %d mea %p\n", + obj = lmv_grab_obj(obd, &data->fid1, 0); + if (obj) { + rc = raw_name2idx(obj->objcount, data->name, + data->namelen); + data->fid1 = obj->objs[rc].fid; + lmv_put_obj(obj); + } + CDEBUG(D_OTHER,"link %u/%u/%u:%*s to %u/%u/%u mds %d\n", (unsigned) data->fid2.mds, (unsigned) data->fid2.id, (unsigned) data->fid2.generation, data->namelen, data->name, (unsigned) data->fid1.mds, (unsigned) data->fid1.id, - (unsigned) data->fid1.generation, i, mea); + (unsigned) data->fid1.generation, data->fid1.mds); } else { /* request from MDS to acquire i_links for inode by fid1 */ - i = data->fid1.mds; CDEBUG(D_OTHER, "inc i_nlinks for %u/%u/%u\n", (unsigned) data->fid1.mds, (unsigned) data->fid1.id, (unsigned) data->fid1.generation); } - rc = md_link(lmv->tgts[i].exp, data, request); + rc = md_link(lmv->tgts[data->fid1.mds].exp, data, request); RETURN(rc); } @@ -791,6 +803,23 @@ int lmv_dirobj_blocking_ast(struct ldlm_lock *lock, RETURN(0); } +void lmv_remove_dots(struct page *page) +{ + char *kaddr = page_address(page); + unsigned limit = PAGE_CACHE_SIZE; + unsigned offs, rec_len; + struct ext2_dir_entry_2 *p; + + for (offs = 0; offs <= limit - EXT2_DIR_REC_LEN(1); offs += rec_len) { + p = (struct ext2_dir_entry_2 *)(kaddr + offs); + rec_len = le16_to_cpu(p->rec_len); + + if ((p->name_len == 1 && p->name[0] == '.') || + (p->name_len == 2 && p->name[0] == '.' && p->name[1] == '.')) + p->inode = 0; + } +} + int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid, __u64 offset, struct page *page, struct ptlrpc_request **request) @@ -826,11 +855,15 @@ int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid, (unsigned long) offset); } rc = md_readpage(lmv->tgts[rfid.mds].exp, &rfid, offset, page, request); + if (rc == 0 && !fid_equal(&rfid, mdc_fid)) { + /* this page isn't from master object. to avoid + * ./.. duplication in directory, we have to remove them + * from all slave objects */ + lmv_remove_dots(page); + } lmv_put_obj(obj); -#warning "we need fix for duplicate . and .. from slaves" - RETURN(rc); } @@ -839,27 +872,30 @@ int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data, { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct mea *mea = data->mea1; int rc, i = 0; ENTRY; lmv_connect(obd); if (data->namelen != 0) { - i = mea_name2idx(mea, (char *) data->name, data->namelen); - if (mea) - data->fid1 = mea->mea_fids[i]; + struct lmv_obj *obj; + obj = lmv_grab_obj(obd, &data->fid1, 0); + if (obj) { + i = raw_name2idx(obj->objcount, data->name, + data->namelen); + data->fid1 = obj->objs[i].fid; + lmv_put_obj(obj); + } CDEBUG(D_OTHER, "unlink '%*s' in %lu/%lu/%lu -> %u\n", data->namelen, data->name, (unsigned long) data->fid1.mds, (unsigned long) data->fid1.id, (unsigned long) data->fid1.generation, i); } else { - i = data->fid1.mds; CDEBUG(D_OTHER, "drop i_nlink on %lu/%lu/%lu\n", (unsigned long) data->fid1.mds, (unsigned long) data->fid1.id, (unsigned long) data->fid1.generation); } - rc = md_unlink(lmv->tgts[i].exp, data, request); + rc = md_unlink(lmv->tgts[data->fid1.mds].exp, data, request); RETURN(rc); } @@ -902,6 +938,26 @@ int lmv_init_ea_size(struct obd_export *exp, int easize, int cookiesize) RETURN(rc); } +int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md **ea, struct obd_trans_info *oti) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lov_stripe_md obj_md; + struct lov_stripe_md *obj_mdp = &obj_md; + int rc = 0; + ENTRY; + lmv_connect(obd); + + LASSERT(ea == NULL); + LASSERT(oa->o_mds < lmv->count); + + rc = obd_create(lmv->tgts[oa->o_mds].exp, oa, &obj_mdp, oti); + LASSERT(rc == 0); + + RETURN(rc); +} + /* * to be called from MDS only */ @@ -916,8 +972,12 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa, ENTRY; lmv_connect(obd); - LASSERT(ea != NULL); LASSERT(oa != NULL); + + if (ea == NULL) { + rc = lmv_obd_create_single(exp, oa, NULL, oti); + RETURN(rc); + } if (*ea == NULL) { rc = obd_alloc_diskmd(exp, (struct lov_mds_md **) ea); diff --git a/lustre/lvfs/fsfilt_ext3.c b/lustre/lvfs/fsfilt_ext3.c index b154e8ae9e..46eb191bf9 100644 --- a/lustre/lvfs/fsfilt_ext3.c +++ b/lustre/lvfs/fsfilt_ext3.c @@ -925,7 +925,7 @@ static int fsfilt_ext3_add_dir_entry(struct obd_device *obd, l_dput(dentry); - return err; + RETURN(err); #else #error "rebuild kernel and lustre with ext3-mds-num patch!" LASSERT(0); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index fd7846d772..9d9e64edcf 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -723,9 +723,6 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, return(rc); } -#define DENTRY_VALID(dentry) \ - ((dentry)->d_inode || ((dentry)->d_flags & DCACHE_CROSS_REF)) - static int mds_getattr_name(int offset, struct ptlrpc_request *req, struct lustre_handle *child_lockh, int child_part) { @@ -1182,9 +1179,9 @@ static int mdt_obj_create(struct ptlrpc_request *req) char fidname[LL_FID_NAMELEN]; struct inode *parent_inode; struct obd_run_ctxt saved; - struct dentry *new_child; int err, namelen, mealen; struct obd_ucred uc; + struct dentry *new; struct mea *mea; void *handle; ENTRY; @@ -1204,14 +1201,50 @@ static int mdt_obj_create(struct ptlrpc_request *req) if (rc) RETURN(rc); + repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody)); + + if (!(body->oa.o_valid & OBD_MD_FLID)) { + /* this is request from another MDS to create remove dir inode */ + unsigned int tmpname = ll_insecure_random_int(); + + handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL); + LASSERT(!IS_ERR(handle)); + + sprintf(fidname, "%u", tmpname); + new = simple_mkdir(mds->mds_objects_dir, fidname, + body->oa.o_mode, 1); + LASSERT(!IS_ERR(new)); + LASSERT(new->d_inode != NULL); + + obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS); + repbody->oa.o_id = new->d_inode->i_ino; + repbody->oa.o_generation = new->d_inode->i_generation; + repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER; + + rc = fsfilt_del_dir_entry(obd, new); + LASSERT(rc == 0); + + rc = fsfilt_commit(obd, parent_inode, handle, 0); + LASSERT(rc == 0); + + CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n", + (unsigned long) new->d_inode->i_ino, + (unsigned long) new->d_inode->i_generation, + (unsigned) new->d_inode->i_mode); + + l_dput(new); + pop_ctxt(&saved, &obd->obd_ctxt, &uc); + RETURN(0); + } + repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody)); memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); namelen = ll_fid2str(fidname, body->oa.o_id, body->oa.o_generation); down(&parent_inode->i_sem); - new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen); - if (new_child->d_inode != NULL) { + new = lookup_one_len(fidname, mds->mds_objects_dir, namelen); + if (new->d_inode != NULL) { CERROR("impossible non-negative obj dentry " LPU64":%u!\n", repbody->oa.o_id, repbody->oa.o_generation); LBUG(); @@ -1221,7 +1254,7 @@ static int mdt_obj_create(struct ptlrpc_request *req) /* FIXME: error handling here */ LASSERT(!IS_ERR(handle)); - rc = vfs_mkdir(parent_inode, new_child, body->oa.o_mode); + rc = vfs_mkdir(parent_inode, new, body->oa.o_mode); up(&parent_inode->i_sem); /* FIXME: error handling here */ if (rc) @@ -1233,14 +1266,14 @@ static int mdt_obj_create(struct ptlrpc_request *req) OBD_ALLOC(mea, mealen); LASSERT(mea != NULL); mea->mea_count = 0; - down(&new_child->d_inode->i_sem); - handle = fsfilt_start(obd, new_child->d_inode, FSFILT_OP_SETATTR, NULL); + down(&new->d_inode->i_sem); + handle = fsfilt_start(obd, new->d_inode, FSFILT_OP_SETATTR, NULL); LASSERT(!IS_ERR(handle)); - rc = fsfilt_set_md(obd, new_child->d_inode, handle, mea, mealen); + rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen); LASSERT(rc == 0); - fsfilt_commit(obd, new_child->d_inode, handle, 0); + fsfilt_commit(obd, new->d_inode, handle, 0); LASSERT(rc == 0); - up(&new_child->d_inode->i_sem); + up(&new->d_inode->i_sem); OBD_FREE(mea, mealen); err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode, @@ -1248,16 +1281,16 @@ static int mdt_obj_create(struct ptlrpc_request *req) /* FIXME: error handling here */ LASSERT(err == 0); - obdo_from_inode(&repbody->oa, new_child->d_inode, FILTER_VALID_FLAGS); - repbody->oa.o_id = new_child->d_inode->i_ino; - repbody->oa.o_generation = new_child->d_inode->i_generation; + obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS); + repbody->oa.o_id = new->d_inode->i_ino; + repbody->oa.o_generation = new->d_inode->i_generation; CDEBUG(D_OTHER, "created dirobj: %lu, %lu mode %o, uid %u, gid %u\n", (unsigned long) repbody->oa.o_id, - (unsigned long) new_child->d_inode->i_ino, - (unsigned) new_child->d_inode->i_mode, - (unsigned) new_child->d_inode->i_uid, - (unsigned) new_child->d_inode->i_gid); - dput(new_child); + (unsigned long) new->d_inode->i_ino, + (unsigned) new->d_inode->i_mode, + (unsigned) new->d_inode->i_uid, + (unsigned) new->d_inode->i_gid); + dput(new); pop_ctxt(&saved, &obd->obd_ctxt, &uc); RETURN(0); } @@ -1322,6 +1355,7 @@ static int mds_set_info(struct obd_export *exp, __u32 keylen, { struct obd_device *obd; struct mds_obd *mds; + int rc; ENTRY; obd = class_exp2obd(exp); @@ -1342,6 +1376,8 @@ static int mds_set_info(struct obd_export *exp, __u32 keylen, atomic_read(&mds->mds_real_clients)); exp->exp_flags |= OBD_OPT_REAL_CLIENT; } + rc = mds_lmv_connect(obd, mds->mds_lmv_name); + LASSERT(rc == 0); RETURN(0); } diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index cb41877a78..c07a5c6855 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -12,6 +12,8 @@ struct mds_filter_data { }; #define MDS_FILTERDATA(inode) ((struct mds_filter_data *)(inode)->i_filterdata) +#define DENTRY_VALID(dentry) \ + ((dentry)->d_inode || ((dentry)->d_flags & DCACHE_CROSS_REF)) static inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req) { @@ -116,5 +118,6 @@ int mds_lmv_disconnect(struct obd_device *obd, int flags); int mds_try_to_split_dir(struct obd_device *, struct dentry *, struct mea **, int); int mds_get_lmv_attr(struct obd_device *, struct inode *, struct mea **, int *); +int mds_choose_mdsnum(struct obd_device *, const char *, int); #endif /* _MDS_INTERNAL_H */ diff --git a/lustre/mds/mds_lib.c b/lustre/mds/mds_lib.c index a40717685d..327fd70a3b 100644 --- a/lustre/mds/mds_lib.c +++ b/lustre/mds/mds_lib.c @@ -83,7 +83,13 @@ void mds_pack_inode2body(struct obd_device *obd, struct mds_body *b, b->flags = inode->i_flags; b->rdev = inode->i_rdev; /* Return the correct link count for orphan inodes */ - b->nlink = mds_inode_is_orphan(inode) ? 0 : inode->i_nlink; + if (mds_inode_is_orphan(inode)) { + b->nlink = 0; + } else if (S_ISDIR(inode->i_mode)) { + b->nlink = 1; + } else { + b->nlink = inode->i_nlink; + } b->generation = inode->i_generation; b->suppgid = -1; b->mds = obd->u.mds.mds_num; diff --git a/lustre/mds/mds_lmv.c b/lustre/mds/mds_lmv.c index 8afb801e5f..3a5c838b15 100644 --- a/lustre/mds/mds_lmv.c +++ b/lustre/mds/mds_lmv.c @@ -117,7 +117,8 @@ int mds_lmv_postsetup(struct obd_device *obd) struct mds_obd *mds = &obd->u.mds; ENTRY; if (mds->mds_lmv_exp) - obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize, 0); + obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize, + mds->mds_max_cookiesize); RETURN(0); } @@ -396,7 +397,7 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry, return 0; } -#define MAX_DIR_SIZE (32 * 1024) +#define MAX_DIR_SIZE (64 * 1024) /* * must not be called on already splitted directories @@ -404,15 +405,11 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry, int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry, struct mea **mea, int nstripes) { - ldlm_policy_data_t policy = { .l_inodebits = {MDS_INODELOCK_UPDATE}}; - struct ldlm_res_id res_id = { .name = {0} }; struct inode *dir = dentry->d_inode; struct mds_obd *mds = &obd->u.mds; - struct lustre_handle lockh; struct mea *tmea = NULL; struct obdo *oa = NULL; - int rc, flags = 0; - int mea_size = 0; + int rc, mea_size = 0; void *handle; ENTRY; @@ -424,7 +421,7 @@ int mds_try_to_split_dir(struct obd_device *obd, if (dentry->d_inode->i_ino == mds->mds_rootfid.id) RETURN(0); -#if 0 +#if 1 if (dir->i_size < MAX_DIR_SIZE) RETURN(0); #endif @@ -445,23 +442,13 @@ int mds_try_to_split_dir(struct obd_device *obd, necessary amount of stripes, but on the other hand with this approach of allocating maximal possible amount of MDS slots, it would be easier to split the dir over more MDSes */ - rc = obd_alloc_diskmd(mds->mds_lmv_exp, mea); + rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *) mea); if (!(*mea)) RETURN(-ENOMEM); (*mea)->mea_count = nstripes; + +#warning "we have to take EX lock on a dir for splitting" - /* convert lock on the dir in order tox - * invalidate client's attributes -bzzz */ - res_id.name[0] = dir->i_ino; - res_id.name[1] = dir->i_generation; - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id, - LDLM_IBITS, &policy, LCK_PW, &flags, - mds_blocking_ast, ldlm_completion_ast, NULL, NULL, - NULL, 0, NULL, &lockh); - if (rc != ELDLM_OK) { - CERROR("error: rc = %d\n", rc); - } - /* 1) create directory objects on slave MDS'es */ /* FIXME: should this be OBD method? */ oa = obdo_alloc(); @@ -496,8 +483,6 @@ int mds_try_to_split_dir(struct obd_device *obd, up(&dir->i_sem); obdo_free(oa); - ldlm_lock_decref(&lockh, LCK_PW); - /* 3) read through the dir and distribute it over objects */ scan_and_distribute(obd, dentry, *mea); @@ -641,3 +626,13 @@ int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, RETURN(rc); } +int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len) +{ + struct mds_obd *mds = &obd->u.mds; + struct lmv_obd *lmv = &mds->mds_lmv_exp->exp_obd->u.lmv; + int i; + + i = raw_name2idx(lmv->count, name, len); + RETURN(i); +} + diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 9f7dd3a691..d3615d416f 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -625,24 +625,74 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, } case S_IFDIR:{ int nstripes = 0; - handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL); - if (IS_ERR(handle)) - GOTO(cleanup, rc = PTR_ERR(handle)); + int i; + + /* as Peter asked, mkdir() should distribute new directories + * over the whole cluster in order to distribute namespace + * processing load. first, we calculate which MDS to use to + * put new directory's inode in */ + i = mds_choose_mdsnum(obd, rec->ur_name, rec->ur_namelen - 1); + if (i == mds->mds_num) { + /* inode will be created locally */ - rc = vfs_mkdir(dir, dchild, rec->ur_mode); + handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL); + if (IS_ERR(handle)) + GOTO(cleanup, rc = PTR_ERR(handle)); - if (rec->ur_eadata) - nstripes = *(u16 *)rec->ur_eadata; + rc = vfs_mkdir(dir, dchild, rec->ur_mode); + + if (rec->ur_eadata) + nstripes = *(u16 *)rec->ur_eadata; #if 1 - /* this is for current testing yet. after the testing - * directory will split if size reaches some limite -bzzz */ - if (rc == 0) { + /* this is for current testing yet. after the testing + * directory will split if size reaches some limite -bzzz */ + if (rc == 0) { #else - if (rc == 0 && nstripes) { + if (rc == 0 && nstripes) { #endif - /* FIXME: error handling here */ - mds_try_to_split_dir(obd, dchild, NULL, nstripes); + /* FIXME: error handling here */ + mds_try_to_split_dir(obd, dchild, NULL, nstripes); + } + } else if (!DENTRY_VALID(dchild)) { + /* inode will be created on another MDS */ + struct obdo *oa = NULL; + struct mds_body *body; + + /* first, create that inode */ + oa = obdo_alloc(); + LASSERT(oa != NULL); + oa->o_mds = i; + obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME | + OBD_MD_FLUID | OBD_MD_FLGID); + oa->o_mode = dir->i_mode; + CDEBUG(D_OTHER, "%s: create dir on MDS %u\n", + obd->obd_name, i); + rc = obd_create(mds->mds_lmv_exp, oa, NULL, NULL); + LASSERT(rc == 0); + + /* now, add new dir entry for it */ + handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL); + if (IS_ERR(handle)) + GOTO(cleanup, rc = PTR_ERR(handle)); + rc = fsfilt_add_dir_entry(obd, dparent, rec->ur_name, + rec->ur_namelen - 1, + oa->o_id, oa->o_generation, + i); + LASSERT(rc == 0); + + /* fill reply */ + body = lustre_msg_buf(req->rq_repmsg, + offset, sizeof (*body)); + body->valid |= OBD_MD_FLID | OBD_MD_MDS; + body->fid1.id = oa->o_id; + body->fid1.mds = i; + body->fid1.generation = oa->o_generation; + obdo_free(oa); + } else { + /* requested name exists in the directory */ + rc = -EEXIST; } EXIT; break; @@ -683,7 +733,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, if (rc) { CDEBUG(D_INODE, "error during create: %d\n", rc); GOTO(cleanup, rc); - } else { + } else if (dchild->d_inode) { struct iattr iattr; struct inode *inode = dchild->d_inode; struct mds_body *body; @@ -1249,7 +1299,6 @@ int mds_create_local_dentry(struct mds_update_record *rec, char *fidname = rec->ur_name; struct dentry *child = NULL; struct lustre_handle lockh; - unsigned mode; void *handle; ENTRY; @@ -1301,8 +1350,13 @@ int mds_create_local_dentry(struct mds_update_record *rec, CERROR("error linking orphan %lu/%lu to FIDS: rc = %d\n", (unsigned long) child->d_inode->i_ino, (unsigned long) child->d_inode->i_generation, rc); - else + else { + if (S_ISDIR(child->d_inode->i_mode)) { + fids_dir->i_nlink++; + mark_inode_dirty(fids_dir); + } mark_inode_dirty(child->d_inode); + } fsfilt_commit(obd, fids_dir, handle, 0); rec->ur_fid1->id = fids_dir->i_ino; @@ -1325,7 +1379,6 @@ cleanup: static int mds_copy_unlink_reply(struct ptlrpc_request *master, struct ptlrpc_request *slave) { - struct lov_mds_md *eadata; void *cookie, *cookie2; struct mds_body *body2; struct mds_body *body; @@ -1339,7 +1392,6 @@ static int mds_copy_unlink_reply(struct ptlrpc_request *master, LASSERT(body2 != NULL); if (!(body->valid & (OBD_MD_FLID | OBD_MD_FLGENER))) { - CWARN("empty reply\n"); RETURN(0); } @@ -1395,8 +1447,6 @@ static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset, LASSERT(offset == 0 || offset == 2); - DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s", - rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name); DEBUG_REQ(D_INODE, req, "unlink %*s (remote inode %u/%u/%u)\n", rec->ur_namelen - 1, rec->ur_name, (unsigned)dchild->d_mdsnum, (unsigned) dchild->d_inum, (unsigned) dchild->d_generation); @@ -1480,7 +1530,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, LASSERT(unlink_by_fid == 0); LASSERT(dchild->d_mdsnum != mds->mds_num); mds_reint_unlink_remote(rec, offset, req, parent_lockh, - dparent, &child_lockh, dchild); + dparent, &child_lockh, dchild); RETURN(0); } @@ -2143,14 +2193,18 @@ static int mds_get_parents_children_locked(struct obd_device *obd, cleanup_phase = 3; /* original name dentry */ inode = (*de_oldp)->d_inode; - if (inode != NULL) + if (inode != NULL) { inode = igrab(inode); - if (inode == NULL) - GOTO(cleanup, rc = -ENOENT); + if (inode == NULL) + GOTO(cleanup, rc = -ENOENT); - c1_res_id.name[0] = inode->i_ino; - c1_res_id.name[1] = inode->i_generation; - iput(inode); + c1_res_id.name[0] = inode->i_ino; + c1_res_id.name[1] = inode->i_generation; + iput(inode); + } else if ((*de_oldp)->d_flags & DCACHE_CROSS_REF) { + c1_res_id.name[0] = (*de_oldp)->d_inum; + c1_res_id.name[1] = (*de_oldp)->d_generation; + } /* Step 4: Lookup the target child entry */ *de_newp = ll_lookup_one_len(new_name, *de_tgtdirp, new_len - 1); @@ -2164,15 +2218,18 @@ static int mds_get_parents_children_locked(struct obd_device *obd, cleanup_phase = 4; /* target dentry */ inode = (*de_newp)->d_inode; - if (inode != NULL) + if (inode != NULL) { inode = igrab(inode); - if (inode == NULL) - goto retry_locks; - - c2_res_id.name[0] = inode->i_ino; - c2_res_id.name[1] = inode->i_generation; + if (inode == NULL) + goto retry_locks; - iput(inode); + c2_res_id.name[0] = inode->i_ino; + c2_res_id.name[1] = inode->i_generation; + iput(inode); + } else if ((*de_newp)->d_flags & DCACHE_CROSS_REF) { + c2_res_id.name[0] = (*de_newp)->d_inum; + c2_res_id.name[1] = (*de_newp)->d_generation; + } retry_locks: /* Step 5: Take locks on the parents and child(ren) */ @@ -2213,7 +2270,7 @@ retry_locks: GOTO(cleanup, rc); } - if ((*de_oldp)->d_inode == NULL) + if (!DENTRY_VALID(*de_oldp)) GOTO(cleanup, rc = -ENOENT); /* Step 6b: Re-lookup target child to verify it hasn't changed */ diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index b967b92e39..20f9d943f4 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -439,7 +439,6 @@ EXPORT_SYMBOL(class_detach); /* mea.c */ EXPORT_SYMBOL(mea_name2idx); -int raw_name2idx(int count, char *name, int namelen); EXPORT_SYMBOL(raw_name2idx); #ifdef LPROCFS diff --git a/lustre/obdclass/mea.c b/lustre/obdclass/mea.c index ceba64ee1d..56fc149ee5 100644 --- a/lustre/obdclass/mea.c +++ b/lustre/obdclass/mea.c @@ -54,7 +54,7 @@ int mea_name2idx(struct mea *mea, char *name, int namelen) return c; } -int raw_name2idx(int count, char *name, int namelen) +int raw_name2idx(int count, const char *name, int namelen) { unsigned int c; -- GitLab