From 8abc9a862244c4f671489d82bb858707bcaacdab Mon Sep 17 00:00:00 2001
From: shadow <shadow>
Date: Thu, 23 Oct 2008 19:12:44 +0000
Subject: [PATCH] in rare cases, inode in catalog can have i_no less than have
 parent i_no, this produce wrong order for locking during open, and parallel
 unlink can be lock open. this need teach mds_open to grab locks in resouce id
 order, not at parent -> child order.

Branch b1_6
b=16492
i=johann
i=alex
---
 lustre/ChangeLog       |   9 ++++
 lustre/mds/mds_open.c  | 110 +++++++++++++++++++++++------------------
 lustre/mds/mds_reint.c |  19 ++++++-
 3 files changed, 88 insertions(+), 50 deletions(-)

diff --git a/lustre/ChangeLog b/lustre/ChangeLog
index 63701475b8..ecce0d8cf6 100644
--- a/lustre/ChangeLog
+++ b/lustre/ChangeLog
@@ -40,6 +40,15 @@ tbd Sun Microsystems, Inc.
 	* File join has been disabled in this release, refer to Bugzilla
 	  16929.
 
+Severity   : major
+Frequency  : rare
+Bugzilla   : 16492
+Description: mds is deadlocked
+Details    : in rare cases, inode in catalog can have i_no less than have parent
+             i_no, this produce wrong order for locking during open, and parallel
+             unlink can be lock open. this need teach mds_open to grab locks in
+             resouce id order, not at parent -> child order.
+
 Severity   : enhancement
 Bugzilla   : 1819
 Description: Add /proc entry for import status
diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c
index 8b4e3f11ca..10e2ef3218 100644
--- a/lustre/mds/mds_open.c
+++ b/lustre/mds/mds_open.c
@@ -929,10 +929,9 @@ int mds_open(struct mds_update_record *rec, int offset,
         int child_mode = LCK_CR;
         /* Always returning LOOKUP lock if open succesful to guard
            dentry on client. */
-        ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_LOOKUP}};
-        struct ldlm_res_id child_res_id = { .name = {0}};
         int lock_flags = 0;
         int rec_pending = 0;
+        int use_parent, need_open_lock;
         unsigned int gid = current->fsgid;
         ENTRY;
 
@@ -1006,17 +1005,45 @@ int mds_open(struct mds_update_record *rec, int offset,
                 RETURN(-ENOMEM);
         }
 
-        /* Step 1: Find and lock the parent */
         if (rec->ur_flags & (MDS_OPEN_CREAT | MDS_OPEN_JOIN_FILE))
                 parent_mode = LCK_EX;
-        dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
-                                        &parent_lockh, rec->ur_name,
-                                        rec->ur_namelen - 1,
-                                        MDS_INODELOCK_UPDATE);
-        if (IS_ERR(dparent)) {
-                rc = PTR_ERR(dparent);
+
+        /* We cannot use acc_mode here, because it is zeroed in case of
+           creating a file, so we get wrong lockmode */
+        if (rec->ur_flags & FMODE_WRITE)
+                child_mode = LCK_CW;
+        else if (rec->ur_flags & MDS_FMODE_EXEC)
+                child_mode = LCK_PR;
+        else
+                child_mode = LCK_CR;
+
+        /* join file and nfsd can't need lookup dchild as use parent for it */
+        use_parent = (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) &&
+                     (rec->ur_flags & MDS_OPEN_LOCK) && (rec->ur_namelen == 1)) ||
+                     (rec->ur_flags & MDS_OPEN_JOIN_FILE);
+
+        need_open_lock = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) &&
+                           (rec->ur_flags & MDS_OPEN_LOCK);
+
+        /* Try to lock both parent and child first. If child is not found,
+         * return only locked parent.  This is enough to prevent other
+         * threads from changing this directory until creation is finished. */
+        rc = mds_get_parent_child_locked(obd, &obd->u.mds,
+                                         rec->ur_fid1,
+                                         &parent_lockh,
+                                         &dparent, parent_mode,
+                                         MDS_INODELOCK_UPDATE,
+                                         use_parent ? NULL : rec->ur_name,
+                                         rec->ur_namelen,
+                                         (rec->ur_flags & MDS_OPEN_LOCK) ?
+                                                child_lockh : NULL,
+                                         &dchild, child_mode,
+                                         MDS_INODELOCK_LOOKUP |
+                                         MDS_INODELOCK_OPEN);
+
+        if (rc) {
                 if (rc != -ENOENT) {
-                        CERROR("parent "LPU64"/%u lookup error %d\n",
+                        CERROR("parent "LPU64"/%u lookup/take lock error %d\n",
                                rec->ur_fid1->id, rec->ur_fid1->generation, rc);
                 } else {
                         /* Just cannot find parent - make it look like
@@ -1030,34 +1057,21 @@ int mds_open(struct mds_update_record *rec, int offset,
 
         cleanup_phase = 1; /* parent dentry and lock */
 
-        if (rec->ur_flags & MDS_OPEN_JOIN_FILE) {
+        if (use_parent) {
                 dchild = dget(dparent);
-                cleanup_phase = 2; /* child dentry */
-                acc_mode = accmode(dchild->d_inode, rec->ur_flags);
-                GOTO(found_child, rc);
         }
 
-        /* Step 2: Lookup the child */
-
-        if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) &&
-            (rec->ur_flags & MDS_OPEN_LOCK) && (rec->ur_namelen == 1)) {
-                /* hack for nfsd with no_subtree_check, it will use anon
-                 * dentry w/o filename to open the file. the anon dentry's
-                 * parent was set to itself, so rec->ur_fid1 is the file.
-                 * And in MDC it cannot derive the dentry's parent dentry,
-                 * hence the file's name, so we hack here in MDS,
-                 * refer to bug 13030. */
-                dchild = mds_fid2dentry(mds, rec->ur_fid1, NULL);
-        } else {
-                dchild = ll_lookup_one_len(rec->ur_name, dparent,
-                                           rec->ur_namelen - 1);
-        }
         if (IS_ERR(dchild)) {
                 rc = PTR_ERR(dchild);
                 dchild = NULL; /* don't confuse mds_finish_transno() below */
                 GOTO(cleanup, rc);
         }
 
+        if (rec->ur_flags & MDS_OPEN_JOIN_FILE) {
+                acc_mode = accmode(dchild->d_inode, rec->ur_flags);
+                GOTO(found_child, rc);
+        }
+
         cleanup_phase = 2; /* child dentry */
 
         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
@@ -1150,6 +1164,14 @@ int mds_open(struct mds_update_record *rec, int offset,
                 acc_mode = 0;           /* Don't check for permissions */
         } else {
                 acc_mode = accmode(dchild->d_inode, rec->ur_flags);
+                /* Child previously existed so the lookup and lock is already
+                 * done, so no further locking is needed. */
+                /* for nfs and join - we need two locks for same fid, but
+                 * with different mode */
+                if (need_open_lock && !use_parent)  {
+                        intent_set_disposition(rep, DISP_OPEN_LOCK);
+                        need_open_lock = 0;
+                }
         }
 
         LASSERTF(!mds_inode_is_orphan(dchild->d_inode),
@@ -1213,20 +1235,10 @@ found_child:
                 GOTO(cleanup, rc = -EAGAIN);
         }
 
-        /* Obtain OPEN lock as well */
-        policy.l_inodebits.bits |= MDS_INODELOCK_OPEN;
-
-        /* We cannot use acc_mode here, because it is zeroed in case of
-           creating a file, so we get wrong lockmode */
-        if (rec->ur_flags & FMODE_WRITE)
-                child_mode = LCK_CW;
-        else if (rec->ur_flags & MDS_FMODE_EXEC)
-                child_mode = LCK_PR;
-        else
-                child_mode = LCK_CR;
+        if (need_open_lock) {
+                ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN } };
+                struct ldlm_res_id child_res_id;
 
-        if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) &&
-             (rec->ur_flags & MDS_OPEN_LOCK)) {
                 /* In case of replay we do not get a lock assuming that the
                    caller has it already */
                 child_res_id.name[0] = dchild->d_inode->i_ino;
@@ -1242,7 +1254,6 @@ found_child:
 
                 /* Let mds_intent_policy know that we have a lock to return */
                 intent_set_disposition(rep, DISP_OPEN_LOCK);
-                cleanup_phase = 3;
         }
 
         if (!S_ISREG(dchild->d_inode->i_mode) &&
@@ -1267,11 +1278,6 @@ found_child:
                 lquota_pending_commit(mds_quota_interface_ref, obd,
                                       current->fsuid, gid, 1);
         switch (cleanup_phase) {
-        case 3:
-                if (rc)
-                        /* It is safe to leave IT_OPEN_LOCK set, if rc is not 0,
-                         * mds_intent_policy won't try to return any locks */
-                        ldlm_lock_decref(child_lockh, child_mode);
         case 2:
                 if (rc && created) {
                         int err;
@@ -1290,8 +1296,14 @@ found_child:
                         qpids[USRQUOTA] = dparent->d_inode->i_uid;
                         qpids[GRPQUOTA] = dparent->d_inode->i_gid;
                 }
-                l_dput(dchild);
         case 1:
+                if (dchild) {
+                        l_dput(dchild);
+                        /* It is safe to leave IT_OPEN_LOCK set, if rc is not 0,
+                         * mds_intent_policy won't try to return any locks */
+                        if (rc && child_lockh->cookie)
+                                ldlm_lock_decref(child_lockh, child_mode);
+                }
                 if (dparent == NULL)
                         break;
 
diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c
index 2dc03816b9..29b23dcdb7 100644
--- a/lustre/mds/mds_reint.c
+++ b/lustre/mds/mds_reint.c
@@ -1330,6 +1330,10 @@ static int mds_verify_child(struct obd_device *obd,
         int rc = 0, cleanup_phase = 2; /* parent, child locks */
         ENTRY;
 
+        /* not want child - not check it */
+        if (name == NULL)
+                RETURN(0);
+
         vchild = ll_lookup_one_len(name, dparent, namelen - 1);
         if (IS_ERR(vchild))
                 GOTO(cleanup, rc = PTR_ERR(vchild));
@@ -1344,6 +1348,12 @@ static int mds_verify_child(struct obd_device *obd,
 
                 RETURN(0);
         }
+        /* resouce is changed, but not want child lock, return new child */
+        if (child_lockh == NULL) {
+                dput(dchild);
+                *dchildp = vchild;
+                GOTO(cleanup, rc = 0);
+        }
 
         CDEBUG(D_DLMTRACE, "child inode changed: %p != %p (%lu != "LPU64")\n",
                vchild->d_inode, dchild ? dchild->d_inode : 0,
@@ -1383,6 +1393,7 @@ static int mds_verify_child(struct obd_device *obd,
                         GOTO(cleanup, rc = -EIO);
         } else {
                 memset(child_res_id, 0, sizeof(*child_res_id));
+                memset(child_lockh, 0, sizeof(*child_lockh));
         }
 
         EXIT;
@@ -1417,6 +1428,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
         struct ldlm_res_id parent_res_id = { .name = {0} };
         ldlm_policy_data_t parent_policy = {.l_inodebits = { parent_lockpart }};
         ldlm_policy_data_t child_policy = {.l_inodebits = { child_lockpart }};
+        static struct ldlm_res_id child_res_id_nolock = { .name = {0} };
         struct inode *inode;
         int rc = 0, cleanup_phase = 0;
         ENTRY;
@@ -1437,6 +1449,8 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
 
         cleanup_phase = 1; /* parent dentry */
 
+        if (name == NULL)
+                GOTO(retry_locks, rc);
         /* Step 2: Lookup child (without DLM lock, to get resource name) */
         *dchildp = ll_lookup_one_len(name, *dparentp, namelen - 1);
         if (IS_ERR(*dchildp)) {
@@ -1446,6 +1460,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
         }
 
         cleanup_phase = 2; /* child dentry */
+
         inode = (*dchildp)->d_inode;
         if (inode != NULL) {
                 if (is_bad_inode(inode)) {
@@ -1479,7 +1494,9 @@ retry_locks:
          *         exist, we still have to lock the parent and re-lookup. */
         rc = enqueue_ordered_locks(obd,&parent_res_id,parent_lockh,parent_mode,
                                    &parent_policy,
-                                   &child_res_id, child_lockh, child_mode,
+                                   child_lockh ? &child_res_id :
+                                                 &child_res_id_nolock,
+                                   child_lockh, child_mode,
                                    &child_policy);
         if (rc)
                 GOTO(cleanup, rc);
-- 
GitLab