Commit f9a2da63 authored by Lai Siyao's avatar Lai Siyao Committed by Oleg Drokin
Browse files

LU-13437 mdt: don't fetch LOOKUP lock for remote object



Pack parent FID in getattr by FID, which will be used to check whether
child is remote object on parent. The helper function is called
mdt_is_remote_object(). NB, directory shard is not treated as remote
object, because if so, client needs to revalidate shards when dir is
accessed, which will hurt performance much.

For getattr by FID, if object is remote file on parent, don't fetch
LOOKUP lock, otherwise client may see stale dir entries.
Signed-off-by: default avatarLai Siyao <lai.siyao@whamcloud.com>
Change-Id: Id181ecc053579ee394080381a82706334503ced0
Reviewed-on: https://review.whamcloud.com/38561

Tested-by: default avatarjenkins <devops@whamcloud.com>
Tested-by: default avatarMaloo <maloo@whamcloud.com>
Reviewed-by: default avatarAndreas Dilger <adilger@whamcloud.com>
Reviewed-by: default avatarYingjin Qian <qian@ddn.com>
Reviewed-by: default avatarOleg Drokin <green@whamcloud.com>
parent e1bf3787
......@@ -1152,7 +1152,7 @@ struct md_ops {
int (*m_free_lustre_md)(struct obd_export *, struct lustre_md *);
int (*m_merge_attr)(struct obd_export *,
int (*m_merge_attr)(struct obd_export *, const struct lu_fid *fid,
const struct lmv_stripe_md *lsm,
struct cl_attr *attr, ldlm_blocking_callback);
......
......@@ -1591,6 +1591,7 @@ static inline int md_free_lustre_md(struct obd_export *exp,
}
static inline int md_merge_attr(struct obd_export *exp,
const struct lu_fid *fid,
const struct lmv_stripe_md *lsm,
struct cl_attr *attr,
ldlm_blocking_callback cb)
......@@ -1601,7 +1602,7 @@ static inline int md_merge_attr(struct obd_export *exp,
if (rc)
return rc;
return MDP(exp->exp_obd, merge_attr)(exp, lsm, attr, cb);
return MDP(exp->exp_obd, merge_attr)(exp, fid, lsm, attr, cb);
}
static inline int md_setxattr(struct obd_export *exp, const struct lu_fid *fid,
......
......@@ -4665,8 +4665,8 @@ static int ll_inode_revalidate(struct dentry *dentry, enum ldlm_intent_flags op)
PFID(ll_inode2fid(inode)), inode, dentry->d_name.name);
/* Call getattr by fid, so do not provide name at all. */
op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0,
LUSTRE_OPC_ANY, NULL);
op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode, inode,
NULL, 0, 0, LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
......@@ -4712,7 +4712,7 @@ static int ll_merge_md_attr(struct inode *inode)
RETURN(0);
down_read(&lli->lli_lsm_sem);
rc = md_merge_attr(ll_i2mdexp(inode), ll_i2info(inode)->lli_lsm_md,
rc = md_merge_attr(ll_i2mdexp(inode), &lli->lli_fid, lli->lli_lsm_md,
&attr, ll_md_blocking_ast);
up_read(&lli->lli_lsm_sem);
if (rc != 0)
......
......@@ -1599,8 +1599,8 @@ static int ll_update_lsm_md(struct inode *inode, struct lustre_md *md)
GOTO(unlock, rc = -ENOMEM);
/* validate the lsm */
rc = md_merge_attr(ll_i2mdexp(inode), lli->lli_lsm_md, attr,
ll_md_blocking_ast);
rc = md_merge_attr(ll_i2mdexp(inode), &lli->lli_fid, lli->lli_lsm_md,
attr, ll_md_blocking_ast);
if (!rc) {
if (md->body->mbo_valid & OBD_MD_FLNLINK)
md->body->mbo_nlink = attr->cat_nlink;
......
......@@ -152,6 +152,7 @@ out:
}
int lmv_revalidate_slaves(struct obd_export *exp,
const struct lu_fid *pfid,
const struct lmv_stripe_md *lsm,
ldlm_blocking_callback cb_blocking,
int extra_lock_flags)
......@@ -198,7 +199,7 @@ int lmv_revalidate_slaves(struct obd_export *exp,
* which is not needed here.
*/
memset(op_data, 0, sizeof(*op_data));
op_data->op_fid1 = fid;
op_data->op_fid1 = *pfid;
op_data->op_fid2 = fid;
tgt = lmv_tgt(lmv, lsm->lsm_md_oinfo[i].lmo_mds);
......@@ -439,13 +440,18 @@ lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
}
retry:
tgt = lmv_locate_tgt(lmv, op_data);
if (op_data->op_name) {
tgt = lmv_locate_tgt(lmv, op_data);
if (!fid_is_sane(&op_data->op_fid2))
fid_zero(&op_data->op_fid2);
} else if (fid_is_sane(&op_data->op_fid2)) {
tgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
} else {
tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
}
if (IS_ERR(tgt))
RETURN(PTR_ERR(tgt));
if (!fid_is_sane(&op_data->op_fid2))
fid_zero(&op_data->op_fid2);
CDEBUG(D_INODE, "LOOKUP_INTENT with fid1="DFID", fid2="DFID
", name='%s' -> mds #%u\n",
PFID(&op_data->op_fid1), PFID(&op_data->op_fid2),
......@@ -463,7 +469,8 @@ retry:
/* If RPC happens, lsm information will be revalidated
* during update_inode process (see ll_update_lsm_md) */
if (lmv_dir_striped(op_data->op_mea2)) {
rc = lmv_revalidate_slaves(exp, op_data->op_mea2,
rc = lmv_revalidate_slaves(exp, &op_data->op_fid2,
op_data->op_mea2,
cb_blocking,
extra_lock_flags);
if (rc != 0)
......
......@@ -53,6 +53,7 @@ int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp,
struct lu_fid *fid, struct md_op_data *op_data);
int lmv_revalidate_slaves(struct obd_export *exp,
const struct lu_fid *pfid,
const struct lmv_stripe_md *lsm,
ldlm_blocking_callback cb_blocking,
int extra_lock_flags);
......
......@@ -3552,6 +3552,7 @@ int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
}
static int lmv_merge_attr(struct obd_export *exp,
const struct lu_fid *fid,
const struct lmv_stripe_md *lsm,
struct cl_attr *attr,
ldlm_blocking_callback cb_blocking)
......@@ -3562,7 +3563,7 @@ static int lmv_merge_attr(struct obd_export *exp,
if (!lmv_dir_striped(lsm))
return 0;
rc = lmv_revalidate_slaves(exp, lsm, cb_blocking, 0);
rc = lmv_revalidate_slaves(exp, fid, lsm, cb_blocking, 0);
if (rc < 0)
return rc;
......
......@@ -1838,14 +1838,14 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
__u64 child_bits,
struct ldlm_reply *ldlm_rep)
{
struct ptlrpc_request *req = mdt_info_req(info);
struct mdt_body *reqbody = NULL;
struct mdt_object *parent = info->mti_object;
struct mdt_object *child;
struct lu_fid *child_fid = &info->mti_tmp_fid1;
struct lu_name *lname = NULL;
struct ptlrpc_request *req = mdt_info_req(info);
struct mdt_body *reqbody = NULL;
struct mdt_object *parent = info->mti_object;
struct mdt_object *child = NULL;
struct lu_fid *child_fid = &info->mti_tmp_fid1;
struct lu_name *lname = NULL;
struct mdt_lock_handle *lhp = NULL;
struct ldlm_lock *lock;
struct ldlm_lock *lock;
struct req_capsule *pill = info->mti_pill;
__u64 try_bits = 0;
bool is_resent;
......@@ -1918,6 +1918,13 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
if (lu_name_is_valid(lname)) {
if (mdt_object_remote(parent)) {
CERROR("%s: parent "DFID" is on remote target\n",
mdt_obd_name(info->mti_mdt),
PFID(mdt_object_fid(parent)));
RETURN(-EPROTO);
}
CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
"ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
PNAME(lname), ldlm_rep);
......@@ -1927,10 +1934,33 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
RETURN(err_serious(-EPROTO));
*child_fid = reqbody->mbo_fid2;
if (unlikely(!fid_is_sane(child_fid)))
RETURN(err_serious(-EINVAL));
if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
mdt_object_get(info->mti_env, parent);
child = parent;
} else {
child = mdt_object_find(info->mti_env, info->mti_mdt,
child_fid);
if (IS_ERR(child))
RETURN(PTR_ERR(child));
}
if (mdt_object_remote(child)) {
CERROR("%s: child "DFID" is on remote target\n",
mdt_obd_name(info->mti_mdt),
PFID(mdt_object_fid(child)));
GOTO(out_child, rc = -EPROTO);
}
/* don't fetch LOOKUP lock if it's remote object */
rc = mdt_is_remote_object(info, parent, child);
if (rc < 0)
GOTO(out_child, rc);
if (rc)
child_bits &= ~MDS_INODELOCK_LOOKUP;
CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
"ldlm_rep = %p\n",
PFID(mdt_object_fid(parent)),
......@@ -1943,14 +1973,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
LU_OBJECT_DEBUG(D_INODE, info->mti_env,
&parent->mot_obj,
"Parent doesn't exist!");
RETURN(-ESTALE);
}
if (mdt_object_remote(parent)) {
CERROR("%s: parent "DFID" is on remote target\n",
mdt_obd_name(info->mti_mdt),
PFID(mdt_object_fid(parent)));
RETURN(-EIO);
GOTO(out_child, rc = -ESTALE);
}
if (lu_name_is_valid(lname)) {
......@@ -1984,30 +2007,18 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
if (rc != 0)
GOTO(out_parent, rc);
}
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
GOTO(unlock_parent, rc);
/*
*step 3: find the child object by fid & lock it.
* regardless if it is local or remote.
*
*Note: LU-3240 (commit 762f2114d282a98ebfa4dbbeea9298a8088ad24e)
* set parent dir fid the same as child fid in getattr by fid case
* we should not lu_object_find() the object again, could lead
* to hung if there is a concurrent unlink destroyed the object.
*/
if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
mdt_object_get(info->mti_env, parent);
child = parent;
} else {
child = mdt_object_find(info->mti_env, info->mti_mdt,
child_fid);
if (unlikely(IS_ERR(child)))
GOTO(unlock_parent, rc = PTR_ERR(child));
}
if (unlikely(IS_ERR(child)))
GOTO(out_parent, rc = PTR_ERR(child));
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
/* step 3: lock child regardless if it is local or remote. */
LASSERT(child);
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
if (!mdt_object_exists(child)) {
......@@ -2116,15 +2127,16 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
unlock_res_and_lock(lock);
}
LDLM_LOCK_PUT(lock);
GOTO(out_parent, rc = 0);
GOTO(unlock_parent, rc = 0);
}
LDLM_LOCK_PUT(lock);
}
EXIT;
out_child:
mdt_object_put(info->mti_env, child);
out_parent:
if (child)
mdt_object_put(info->mti_env, child);
unlock_parent:
if (lhp)
mdt_object_unlock(info, parent, lhp, 1);
return rc;
......
......@@ -559,6 +559,8 @@ struct mdt_thread_info {
/* FLR: layout change API */
struct md_layout_change mti_mlc;
struct lu_seq_range mti_range;
};
extern struct lu_context_key mdt_thread_key;
......@@ -1430,5 +1432,8 @@ void mdt_restripe_migrate_add(struct mdt_thread_info *info,
struct mdt_object *o);
void mdt_restripe_update_add(struct mdt_thread_info *info,
struct mdt_object *o);
int mdt_is_remote_object(struct mdt_thread_info *info,
struct mdt_object *parent,
struct mdt_object *child);
#endif /* _MDT_INTERNAL_H */
......@@ -1832,3 +1832,134 @@ int mdt_pack_secctx_in_reply(struct mdt_thread_info *info,
}
return rc;
}
/* check whether two FIDs belong to different MDT. */
static int mdt_fids_different_target(struct mdt_thread_info *info,
const struct lu_fid *fid1,
const struct lu_fid *fid2)
{
const struct lu_env *env = info->mti_env;
struct mdt_device *mdt = info->mti_mdt;
struct lu_seq_range *range = &info->mti_range;
struct seq_server_site *ss;
__u32 index1, index2;
int rc;
if (fid_seq(fid1) == fid_seq(fid2))
return 0;
ss = mdt->mdt_lu_dev.ld_site->ld_seq_site;
range->lsr_flags = LU_SEQ_RANGE_MDT;
rc = fld_server_lookup(env, ss->ss_server_fld, fid1->f_seq, range);
if (rc)
return rc;
index1 = range->lsr_index;
rc = fld_server_lookup(env, ss->ss_server_fld, fid2->f_seq, range);
if (rc)
return rc;
index2 = range->lsr_index;
return index1 != index2;
}
static bool mdt_object_is_shard(struct mdt_thread_info *info,
struct mdt_object *obj)
{
struct lmv_mds_md_v1 *lmv = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
struct lu_buf buf;
int rc;
if (!mdt_object_exists(obj))
return false;
if (!S_ISDIR(lu_object_attr(&obj->mot_obj)))
return false;
buf.lb_buf = lmv;
buf.lb_len = sizeof(*lmv);
rc = mo_xattr_get(info->mti_env, mdt_object_child(obj), &buf,
XATTR_NAME_LMV);
if (rc < 0)
return false;
return lmv->lmv_magic == cpu_to_le32(LMV_MAGIC_STRIPE);
}
/**
* Check whether \a child is remote object on \a parent.
*
* \param[in] info thread environment
* \param[in] parent parent object, it's the same as child object in
* getattr_by_fid
* \param[in] child child object
*
* \retval 1 is remote object.
* \retval 0 isn't remote object.
* \retval < 1 error code
*/
int mdt_is_remote_object(struct mdt_thread_info *info,
struct mdt_object *parent,
struct mdt_object *child)
{
struct lu_buf *buf = &info->mti_big_buf;
struct linkea_data ldata = { NULL };
struct link_ea_header *leh;
struct link_ea_entry *lee;
struct lu_name name;
struct lu_fid pfid;
int reclen;
int i;
int rc;
ENTRY;
if (fid_is_root(mdt_object_fid(child)))
RETURN(0);
if (likely(parent != child)) {
if (mdt_object_remote(parent) ^ mdt_object_remote(child)) {
/* don't treat shard as remote object, otherwise client
* need to revalidate shards all the time.
*/
if (mdt_object_is_shard(info, child))
RETURN(0);
RETURN(1);
}
if (!mdt_object_remote(parent) && !mdt_object_remote(child))
RETURN(0);
rc = mdt_fids_different_target(info, mdt_object_fid(parent),
mdt_object_fid(child));
RETURN(rc);
}
/* client < 2.13.52 getattr_by_fid parent and child are the same */
buf = lu_buf_check_and_alloc(buf, PATH_MAX);
if (!buf->lb_buf)
RETURN(-ENOMEM);
ldata.ld_buf = buf;
rc = mdt_links_read(info, child, &ldata);
/* can't read linkea, just assume it's remote object */
if (rc == -ENOENT || rc == -ENODATA)
RETURN(1);
if (rc)
RETURN(rc);
leh = buf->lb_buf;
lee = (struct link_ea_entry *)(leh + 1);
for (i = 0; i < leh->leh_reccount; i++) {
linkea_entry_unpack(lee, &reclen, &name, &pfid);
lee = (struct link_ea_entry *) ((char *)lee + reclen);
if (mdt_fids_different_target(info, &pfid,
mdt_object_fid(child)))
RETURN(1);
}
RETURN(0);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment