From 4ac04380f2d3db63b458ff41d1944a9ce8e2f463 Mon Sep 17 00:00:00 2001
From: yury <yury>
Date: Fri, 27 May 2005 18:19:41 +0000
Subject: [PATCH] - landing MOD (MDS Originated destroy) plus few ldlm and MDS
 fixes.

---
 lustre/include/linux/lustre_dlm.h |   2 +
 lustre/include/linux/obd.h        |   1 +
 lustre/ldlm/ldlm_lock.c           |   1 +
 lustre/ldlm/ldlm_request.c        |  22 +++---
 lustre/ldlm/ldlm_resource.c       |   5 +-
 lustre/llite/file.c               |   9 ---
 lustre/llite/namei.c              |  86 ---------------------
 lustre/lmv/lmv_intent.c           | 122 ++++++++++++++++--------------
 lustre/mds/mds_internal.h         |   7 ++
 lustre/mds/mds_open.c             |  57 ++++++++++++++
 lustre/mds/mds_reint.c            |  42 ++++++----
 lustre/mds/mds_unlink_open.c      |  26 ++++---
 lustre/osc/osc_request.c          |  39 ++++++----
 lustre/ptlrpc/client.c            |   1 -
 lustre/tests/replay-single.sh     |   8 +-
 15 files changed, 218 insertions(+), 210 deletions(-)

diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h
index e55f56b8d3..d3e71e2b7c 100644
--- a/lustre/include/linux/lustre_dlm.h
+++ b/lustre/include/linux/lustre_dlm.h
@@ -255,6 +255,8 @@ struct ldlm_lock {
         unsigned long         l_callback_timeout;
 
         __u32                 l_pid;            /* pid which created this lock */
+
+        struct list_head      l_tmp;
 };
 
 #define LDLM_PLAIN       10
diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h
index 35bd876238..260b94baf9 100644
--- a/lustre/include/linux/obd.h
+++ b/lustre/include/linux/obd.h
@@ -550,6 +550,7 @@ struct obd_trans_info {
         struct llog_cookie       oti_onecookie;
         struct llog_cookie      *oti_logcookies;
         int                      oti_numcookies;
+        int                      oti_async;
 };
 
 static inline void oti_alloc_cookies(struct obd_trans_info *oti,int num_cookies)
diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c
index cbabea0a9e..2b0288279a 100644
--- a/lustre/ldlm/ldlm_lock.c
+++ b/lustre/ldlm/ldlm_lock.c
@@ -260,6 +260,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         INIT_LIST_HEAD(&lock->l_lru);
         INIT_LIST_HEAD(&lock->l_export_chain);
         INIT_LIST_HEAD(&lock->l_pending_chain);
+        INIT_LIST_HEAD(&lock->l_tmp);
         init_waitqueue_head(&lock->l_waitq);
 
         spin_lock(&resource->lr_namespace->ns_counter_lock);
diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c
index 012b31eb7c..0fa697d307 100644
--- a/lustre/ldlm/ldlm_request.c
+++ b/lustre/ldlm/ldlm_request.c
@@ -648,8 +648,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
  * callback will be performed in this function. */
 int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
 {
-        struct list_head *tmp, *next;
-        struct ldlm_lock *lock;
+        struct ldlm_lock *lock, *next;
         int count, rc = 0;
         LIST_HEAD(cblist);
         ENTRY;
@@ -666,10 +665,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
                 RETURN(0);
         }
 
-        list_for_each_safe(tmp, next, &ns->ns_unused_list) {
-
-                lock = list_entry(tmp, struct ldlm_lock, l_lru);
-
+        list_for_each_entry_safe(lock, next, &ns->ns_unused_list, l_lru) {
                 LASSERT(!lock->l_readers && !lock->l_writers);
 
                 /* Setting the CBPENDING flag is a little misleading, but
@@ -681,19 +677,25 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
 
                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
                 ldlm_lock_remove_from_lru(lock);
+
+                /* We can't re-add to l_lru as it confuses the refcounting in
+                 * ldlm_lock_remove_from_lru() if an AST arrives after we drop
+                 * ns_lock below. We use l_tmp and can't use l_pending_chain as
+                 * it is used both on server and client nevertheles bug 5666
+                 * says it is used only on server. --umka */
                 if (sync != LDLM_ASYNC || ldlm_bl_to_thread(ns, NULL, lock))                        
-                        list_add(&lock->l_lru, &cblist);
+                        list_add(&lock->l_tmp, &cblist);
 
                 if (--count == 0)
                         break;
         }
         l_unlock(&ns->ns_lock);
 
-        list_for_each_safe(tmp, next, &cblist) {
-                lock = list_entry(tmp, struct ldlm_lock, l_lru);
-                list_del_init(&lock->l_lru);
+        list_for_each_entry_safe(lock, next, &cblist, l_tmp) {
+                list_del_init(&lock->l_tmp);
                 ldlm_handle_bl_callback(ns, NULL, lock);
         }
+
         RETURN(rc);
 }
 
diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c
index 205126852b..ac93ac6c78 100644
--- a/lustre/ldlm/ldlm_resource.c
+++ b/lustre/ldlm/ldlm_resource.c
@@ -150,7 +150,7 @@ static int lprocfs_write_lru_size(struct file *file, const char *buffer,
                 ns->ns_max_unused = 0;
                 ldlm_cancel_lru(ns, LDLM_SYNC);
                 ns->ns_max_unused = tmp;
-                return count;
+	        return count;
         }
 
         tmp = simple_strtoul(dummy, &end, 0);
@@ -161,10 +161,9 @@ static int lprocfs_write_lru_size(struct file *file, const char *buffer,
 
         CDEBUG(D_DLMTRACE, "changing namespace %s max_unused from %u to %u\n",
                ns->ns_name, ns->ns_max_unused, (unsigned int)tmp);
-        ns->ns_max_unused = (unsigned int)tmp;
 
+        ns->ns_max_unused = (unsigned int)tmp;
         ldlm_cancel_lru(ns, LDLM_ASYNC);
-
         return count;
 }
 
diff --git a/lustre/llite/file.c b/lustre/llite/file.c
index 6704a7c01c..3be6c4800d 100644
--- a/lustre/llite/file.c
+++ b/lustre/llite/file.c
@@ -92,15 +92,6 @@ int ll_md_och_close(struct obd_export *md_exp, struct inode *inode,
                        (unsigned long)inode->i_ino, rc);
         }
 
-        /* objects are destroed on OST only if metadata close was
-         * successful.*/
-        if (rc == 0) {
-                rc = ll_objects_destroy(req, inode, 1);
-                if (rc)
-                        CERROR("inode %lu ll_objects destroy: rc = %d\n",
-                               inode->i_ino, rc);
-        }
-
         ptlrpc_req_finished(req);
         EXIT;
 out:
diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c
index 9d75bf94af..f9be77d797 100644
--- a/lustre/llite/namei.c
+++ b/lustre/llite/namei.c
@@ -780,89 +780,6 @@ static int ll_rmdir_raw(struct nameidata *nd)
         RETURN(rc);
 }
 
-int ll_objects_destroy(struct ptlrpc_request *request,
-                       struct inode *dir, int offset)
-{
-        struct mds_body *body;
-        struct lov_mds_md *eadata;
-        struct lov_stripe_md *lsm = NULL;
-        struct obd_trans_info oti = { 0 };
-        struct obdo *oa;
-        int rc;
-        ENTRY;
-
-        /* req is swabbed so this is safe */
-        body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
-
-        if (!(body->valid & OBD_MD_FLEASIZE))
-                RETURN(0);
-
-        if (body->eadatasize == 0) {
-                CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n");
-                GOTO(out, rc = -EPROTO);
-        }
-
-        /*
-         * the MDS sent back the EA because we unlinked the last reference to
-         * this file. Use this EA to unlink the objects on the OST. It's opaque
-         * so we don't swab here; we leave it to obd_unpackmd() to check it is
-         * complete and sensible.
-         */
-        eadata = lustre_swab_repbuf(request, 1, body->eadatasize, NULL);
-        LASSERT(eadata != NULL);
-        if (eadata == NULL) {
-                CERROR("Can't unpack MDS EA data\n");
-                GOTO(out, rc = -EPROTO);
-        }
-
-        rc = obd_unpackmd(ll_i2dtexp(dir), &lsm, eadata, body->eadatasize);
-        if (rc < 0) {
-                CERROR("obd_unpackmd: %d\n", rc);
-                GOTO(out, rc);
-        }
-        LASSERT(rc >= sizeof(*lsm));
-
-        oa = obdo_alloc();
-        if (oa == NULL)
-                GOTO(out_free_memmd, rc = -ENOMEM);
-
-        oa->o_id = lsm->lsm_object_id;
-        oa->o_gr = lsm->lsm_object_gr;
-        oa->o_mode = body->mode & S_IFMT;
-        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
-
-        if (body->valid & OBD_MD_FLCOOKIE) {
-                int length = sizeof(struct llog_cookie) *
-                                lsm->lsm_stripe_count;
-                oa->o_valid |= OBD_MD_FLCOOKIE;
-                oti.oti_logcookies =
-                        lustre_msg_buf(request->rq_repmsg, 2, length);
-                if (oti.oti_logcookies == NULL) {
-                        oa->o_valid &= ~OBD_MD_FLCOOKIE;
-                        body->valid &= ~OBD_MD_FLCOOKIE;
-                } else {
-                        /* copy llog cookies to request to replay unlink
-                         * so that the same llog file and records as those created
-                         * during fail can be re-created while doing replay 
-                         */
-                        if (offset >= 0)
-                                memcpy(lustre_msg_buf(request->rq_reqmsg, offset, 0),
-                                       oti.oti_logcookies, length);
-                }
-        }
-
-        rc = obd_destroy(ll_i2dtexp(dir), oa, lsm, &oti);
-        obdo_free(oa);
-        if (rc)
-                CERROR("obd destroy objid "LPX64" error %d\n",
-                       lsm->lsm_object_id, rc);
-        EXIT;
- out_free_memmd:
-        obd_free_memmd(ll_i2dtexp(dir), &lsm);
- out:
-        return rc;
-}
-
 static int ll_unlink_raw(struct nameidata *nd)
 {
         struct inode *dir = nd->dentry->d_inode;
@@ -882,8 +799,6 @@ static int ll_unlink_raw(struct nameidata *nd)
         if (rc)
                 GOTO(out, rc);
         ll_update_times(request, 0, dir);
-        
-        rc = ll_objects_destroy(request, dir, 2);
         EXIT;
 out:
         ptlrpc_req_finished(request);
@@ -919,7 +834,6 @@ static int ll_rename_raw(struct nameidata *srcnd, struct nameidata *tgtnd)
         if (!err) {
                 ll_update_times(request, 0, src);
                 ll_update_times(request, 0, tgt);
-                err = ll_objects_destroy(request, src, 3);
         }
 
         ptlrpc_req_finished(request);
diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c
index 3ff3fbbb75..ea613eeca6 100644
--- a/lustre/lmv/lmv_intent.c
+++ b/lustre/lmv/lmv_intent.c
@@ -57,69 +57,71 @@ static inline void lmv_drop_intent_lock(struct lookup_intent *it)
                                  LUSTRE_IT(it)->it_lock_mode);
 }
 
-int lmv_handle_remote_inode(struct obd_export *exp, void *lmm,
-                            int lmmsize, struct lookup_intent *it,
-                            int flags, struct ptlrpc_request **reqp,
-                            ldlm_blocking_callback cb_blocking)
+int lmv_intent_remote(struct obd_export *exp, void *lmm,
+                      int lmmsize, struct lookup_intent *it,
+                      int flags, struct ptlrpc_request **reqp,
+                      ldlm_blocking_callback cb_blocking)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
+        struct ptlrpc_request *req = NULL;
         struct mds_body *body = NULL;
-        int rc = 0;
+        struct lustre_handle plock;
+        struct lustre_id nid;
+        int pmode, rc = 0;
         ENTRY;
 
         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
         LASSERT(body != NULL);
 
-        if (body->valid & OBD_MD_MDS) {
+        if (!(body->valid & OBD_MD_MDS))
+                RETURN(0);
+
+        /*
+         * oh, MDS reports that this is remote inode case i.e. we have to ask
+         * for real attrs on another MDS.
+         */
+        if (it->it_op == IT_LOOKUP || it->it_op == IT_CHDIR) {
                 /*
-                 * oh, MDS reports that this is remote inode case i.e. we have
-                 * to ask for real attrs on another MDS.
+                 * unfortunately, we have to lie to MDC/MDS to retrieve
+                 * attributes llite needs.
                  */
-                struct ptlrpc_request *req = NULL;
-                struct lustre_handle plock;
-                struct lustre_id nid;
-                int pmode;
-
-                if (it->it_op == IT_LOOKUP || it->it_op == IT_CHDIR) {
-                        /*
-                         * unfortunately, we have to lie to MDC/MDS to retrieve
-                         * attributes llite needs.
-                         */
-                        it->it_op = IT_GETATTR;
-                }
+                it->it_op = IT_GETATTR;
+        }
 
-                /* we got LOOKUP lock, but we really need attrs */
-                pmode = LUSTRE_IT(it)->it_lock_mode;
-                if (pmode) {
-                        memcpy(&plock, &LUSTRE_IT(it)->it_lock_handle,
-                               sizeof(plock));
-                        LUSTRE_IT(it)->it_lock_mode = 0;
-                }
+        /* we got LOOKUP lock, but we really need attrs */
+        pmode = LUSTRE_IT(it)->it_lock_mode;
+        if (pmode) {
+                memcpy(&plock, &LUSTRE_IT(it)->it_lock_handle,
+                       sizeof(plock));
+                LUSTRE_IT(it)->it_lock_mode = 0;
+                LUSTRE_IT(it)->it_data = 0;
+        }
 
-                LASSERT((body->valid & OBD_MD_FID) != 0);
+        LASSERT((body->valid & OBD_MD_FID) != 0);
                 
-                nid = body->id1;
-                LUSTRE_IT(it)->it_disposition &= ~DISP_ENQ_COMPLETE;
-                rc = md_intent_lock(lmv->tgts[id_group(&nid)].ltd_exp, &nid, NULL,
-                                    0, lmm, lmmsize, NULL, it, flags, &req, cb_blocking);
+        nid = body->id1;
+        LUSTRE_IT(it)->it_disposition &= ~DISP_ENQ_COMPLETE;
+        rc = md_intent_lock(lmv->tgts[id_group(&nid)].ltd_exp, &nid,
+                            NULL, 0, lmm, lmmsize, NULL, it, flags,
+                            &req, cb_blocking);
 
-                /*
-                 * llite needs LOOKUP lock to track dentry revocation in order
-                 * to maintain dcache consistency. Thus drop UPDATE lock here
-                 * and put LOOKUP in request.
-                 */
-                if (rc == 0) {
-                        lmv_drop_intent_lock(it);
-                        memcpy(&LUSTRE_IT(it)->it_lock_handle, &plock,
-                               sizeof(plock));
-                        LUSTRE_IT(it)->it_lock_mode = pmode;
-                } else if (pmode)
-                        ldlm_lock_decref(&plock, pmode);
-
-                ptlrpc_req_finished(*reqp);
-                *reqp = req;
+        /*
+         * llite needs LOOKUP lock to track dentry revocation in order to
+         * maintain dcache consistency. Thus drop UPDATE lock here and put
+         * LOOKUP in request.
+         */
+        if (rc == 0) {
+                lmv_drop_intent_lock(it);
+                memcpy(&LUSTRE_IT(it)->it_lock_handle, &plock,
+                       sizeof(plock));
+                LUSTRE_IT(it)->it_lock_mode = pmode;
+        } else if (pmode) {
+                ldlm_lock_decref(&plock, pmode);
         }
+
+        ptlrpc_req_finished(*reqp);
+        *reqp = req;
         RETURN(rc);
 }
 
@@ -174,14 +176,13 @@ repeat:
 
         /* okay, MDS has returned success. Probably name has been resolved in
          * remote inode */
-        rc = lmv_handle_remote_inode(exp, lmm, lmmsize, it,
-                                     flags, reqp, cb_blocking);
+        rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, cb_blocking);
         if (rc != 0) {
                 LASSERT(rc < 0);
 
                 /* 
                  * this is possible, that some userspace application will try to
-                 * open file as directory and we will have error -20 here. As
+                 * open file as directory and we will have -ENOTDIR here. As
                  * this is "usual" situation, we should not print error here,
                  * only debug info.
                  */
@@ -195,14 +196,19 @@ repeat:
          * nothing is found, do not access body->id1 as it is zero and thus
          * pointless.
          */
-        if (LUSTRE_IT(it)->it_disposition & DISP_LOOKUP_NEG)
+        if ((LUSTRE_IT(it)->it_disposition & DISP_LOOKUP_NEG) &&
+            !(LUSTRE_IT(it)->it_disposition & DISP_OPEN_CREATE) &&
+            !(LUSTRE_IT(it)->it_disposition & DISP_OPEN_OPEN))
                 RETURN(0);
 
         /* caller may use attrs MDS returns on IT_OPEN lock request so, we have
          * to update them for splitted dir */
         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
         LASSERT(body != NULL);
-        LASSERT((body->valid & OBD_MD_FID) != 0);
+
+        /* could not find object, FID is not present in response. */
+        if (!(body->valid & OBD_MD_FID))
+                RETURN(0);
         
         cid = &body->id1;
         obj = lmv_grab_obj(obd, cid);
@@ -309,8 +315,8 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
  
         /* okay, MDS has returned success. probably name has been
          * resolved in remote inode */
-        rc = lmv_handle_remote_inode(exp, lmm, lmmsize, it,
-                                     flags, reqp, cb_blocking);
+        rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
+                               reqp, cb_blocking);
         if (rc < 0)
                 RETURN(rc);
 
@@ -323,7 +329,10 @@ int lmv_intent_getattr(struct obd_export *exp, struct lustre_id *pid,
                 
         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
         LASSERT(body != NULL);
-        LASSERT((body->valid & OBD_MD_FID) != 0);
+
+        /* could not find object, FID is not present in response. */
+        if (!(body->valid & OBD_MD_FID))
+                RETURN(0);
 
         cid = &body->id1;
         obj2 = lmv_grab_obj(obd, cid);
@@ -560,8 +569,7 @@ repeat:
 
         /* okay, MDS has returned success. Probably name has been resolved in
          * remote inode. */
-        rc = lmv_handle_remote_inode(exp, lmm, lmmsize, it,
-                                     flags, reqp, cb_blocking);
+        rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp, cb_blocking);
 
         if (rc == 0 && (mea = lmv_splitted_dir_body(*reqp, 1))) {
                 /* wow! this is splitted dir, we'd like to handle it */
diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h
index 728e1525b7..0e6dd0f2b1 100644
--- a/lustre/mds/mds_internal.h
+++ b/lustre/mds/mds_internal.h
@@ -118,6 +118,11 @@ void mds_exit_ucred(struct lvfs_ucred *ucred);
 /* mds/mds_unlink_open.c */
 int mds_cleanup_orphans(struct obd_device *obd);
 
+int mds_unlink_object(struct mds_obd *mds, struct inode *inode,
+                      struct lov_mds_md *lmm, int lmm_size,
+                      struct llog_cookie *logcookies,
+                      int log_unlink, int async);
+        
 
 /* mds/mds_log.c */
 int mds_log_op_unlink(struct obd_device *obd, struct inode *inode,
@@ -149,6 +154,8 @@ int mds_revalidate_lov_ea(struct obd_device *obd, struct inode *inode,
                           struct lustre_msg *msg, int offset);
 
 /* mds/mds_open.c */
+int mds_destroy_objects(struct obd_device *obd,
+                        struct inode *inode, int async);
 int mds_query_write_access(struct inode *inode);
 int mds_open(struct mds_update_record *rec, int offset,
              struct ptlrpc_request *req, struct lustre_handle *);
diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c
index 751905e65c..57256fed76 100644
--- a/lustre/mds/mds_open.c
+++ b/lustre/mds/mds_open.c
@@ -500,6 +500,56 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
         RETURN(rc);
 }
 
+int
+mds_destroy_objects(struct obd_device *obd,
+                    struct inode *inode, int async)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct lov_mds_md *lmm = NULL;
+        int rc, lmm_size;
+        ENTRY;
+
+        LASSERT(inode != NULL);
+
+        if (inode->i_nlink != 0) {
+                CWARN("attempt to destroy OSS object when "
+                      "i_nlink == %d\n", (int)inode->i_nlink);
+                RETURN(0);
+        }
+        
+        OBD_ALLOC(lmm, mds->mds_max_mdsize);
+        if (lmm == NULL)
+                RETURN(-ENOMEM);
+
+        lmm_size = mds->mds_max_mdsize;
+        rc = mds_get_md(obd, inode, lmm, &lmm_size, 1, 0);
+        if (rc < 0) {
+                CERROR("no stripe info for %lu/%lu inode\n",
+                       (unsigned long)inode->i_ino,
+                       (unsigned long)inode->i_generation);
+                GOTO(out_free_lmm, rc);
+        }
+
+        if (rc > 0) {
+                /* asynchronously unlink objecect on OSS */
+                rc = mds_unlink_object(mds, inode, lmm, lmm_size,
+                                       NULL, 0, async);
+                if (rc) {
+                        CERROR("error unlinking object on OSS, "
+                               "err %d\n", rc);
+                        GOTO(out_free_lmm, rc);
+                }
+        } else {
+                CDEBUG(D_INODE, "no stripping info found for inode "
+		      "%lu/%lu\n", (unsigned long)inode->i_ino, 
+		      (unsigned long)inode->i_generation);
+        }
+        EXIT;
+out_free_lmm:
+        OBD_FREE(lmm, mds->mds_max_mdsize);
+        return rc;
+}
+
 static void reconstruct_open(struct mds_update_record *rec, int offset,
                              struct ptlrpc_request *req,
                              struct lustre_handle *child_lockh)
@@ -1463,6 +1513,13 @@ int mds_mfd_close(struct ptlrpc_request *req, int offset,
                                       req->rq_repmsg->buflens[2], &lcl) > 0) {
                         reply_body->valid |= OBD_MD_FLCOOKIE;
                 }
+		
+		rc = mds_destroy_objects(obd, inode, 1);
+		if (rc) {
+			CERROR("cannot destroy OSS object on close, err %d\n",
+			       rc);
+			rc = 0;
+		}
 
                 goto out; /* Don't bother updating attrs on unlinked inode */
         }
diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c
index 355eedeeb9..8f1f983eaf 100644
--- a/lustre/mds/mds_reint.c
+++ b/lustre/mds/mds_reint.c
@@ -108,7 +108,7 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
         if (req->rq_export->exp_failed) {
                 CERROR("committing transaction for disconnected client\n");
                 if (handle)
-                        GOTO(out_commit, rc);
+                        GOTO(out_commit, rc = -EIO);
                 RETURN(rc);
         }
 
@@ -1676,11 +1676,11 @@ void mds_reconstruct_generic(struct ptlrpc_request *req)
         mds_req_from_mcd(req, med->med_mcd);
 }
 
-/* If we are unlinking an open file/dir (i.e. creating an orphan) then
- * we instead link the inode into the PENDING directory until it is
- * finally released.  We can't simply call mds_reint_rename() or some
- * part thereof, because we don't have the inode to check for link
- * count/open status until after it is locked.
+/* If we are unlinking an open file/dir (i.e. creating an orphan) then we
+ * instead link the inode into the PENDING directory until it is finally
+ * released. We can't simply call mds_reint_rename() or some part thereof,
+ * because we don't have the inode to check for link count/open status until
+ * after it is locked.
  *
  * For lock ordering, caller must get child->i_sem first, then pending->i_sem
  * before starting journal transaction.
@@ -1728,8 +1728,10 @@ static int mds_orphan_add_link(struct mds_update_record *rec,
                 GOTO(out_dput, rc = 0);
         }
 
-        /* link() is semanticaly-wrong for S_IFDIR, so we set S_IFREG
-         * for linking and return real mode back then -bzzz */
+        /*
+         * link() is semanticaly-wrong for S_IFDIR, so we set S_IFREG for
+         * linking and return real mode back then -bzzz
+         */
         mode = inode->i_mode;
         inode->i_mode = S_IFREG;
         rc = vfs_link(dentry, pending_dir, pending_child);
@@ -1751,7 +1753,7 @@ static int mds_orphan_add_link(struct mds_update_record *rec,
                 mark_inode_dirty(pending_dir);
         }
 
-        EXIT;
+        GOTO(out_dput, rc = 1);
 out_dput:
         l_dput(pending_child);
         return rc;
@@ -2228,13 +2230,19 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
                 } else if (mds_log_op_unlink(obd, child_inode,
-                                lustre_msg_buf(req->rq_repmsg, offset + 1, 0),
-                                        req->rq_repmsg->buflens[offset + 1],
-                                lustre_msg_buf(req->rq_repmsg, offset + 2, 0),
-                                        req->rq_repmsg->buflens[offset+2], 
-                                &lcl) > 0){
+                                             lustre_msg_buf(req->rq_repmsg, offset + 1, 0),
+                                             req->rq_repmsg->buflens[offset + 1],
+                                             lustre_msg_buf(req->rq_repmsg, offset + 2, 0),
+                                             req->rq_repmsg->buflens[offset + 2], 
+                                             &lcl) > 0){
                         body->valid |= OBD_MD_FLCOOKIE;
                 }
+                
+                rc = mds_destroy_objects(obd, child_inode, 1);
+                if (rc) {
+                        CERROR("can't remove OST object, err %d\n",
+                               rc);
+                }
         }
 
         GOTO(cleanup, rc);
@@ -3485,6 +3493,12 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
                                              &lcl) > 0) {
                         body->valid |= OBD_MD_FLCOOKIE;
                 }
+                
+                rc = mds_destroy_objects(obd, old_inode, 1);
+                if (rc) {
+                        CERROR("can't remove OST object, err %d\n",
+                               rc);
+                }
         }
 
         EXIT;
diff --git a/lustre/mds/mds_unlink_open.c b/lustre/mds/mds_unlink_open.c
index 154efb3b86..0e361b5261 100644
--- a/lustre/mds/mds_unlink_open.c
+++ b/lustre/mds/mds_unlink_open.c
@@ -40,12 +40,15 @@
 
 #include "mds_internal.h"
 
-static int mds_osc_destroy_orphan(struct mds_obd *mds,
-                                  struct inode *inode,
-                                  struct lov_mds_md *lmm,
-                                  int lmm_size,
-                                  struct llog_cookie *logcookies,
-                                  int log_unlink)
+/*
+ * used when destroying orphanes and from mds_reint_unlink() when MDS wants to
+ * destroy objects on OSS.
+ */
+int
+mds_unlink_object(struct mds_obd *mds, struct inode *inode,
+                  struct lov_mds_md *lmm, int lmm_size,
+                  struct llog_cookie *logcookies,
+                  int log_unlink, int async)
 {
         struct lov_stripe_md *lsm = NULL;
         struct obd_trans_info oti = { 0 };
@@ -78,11 +81,12 @@ static int mds_osc_destroy_orphan(struct mds_obd *mds,
                 oti.oti_logcookies = logcookies;
         }
 
+        CDEBUG(D_INODE, "destroy OSS object %d/%d\n",
+               (int)oa->o_id, (int)oa->o_gr);
+        
+        oti.oti_async = async;
         rc = obd_destroy(mds->mds_dt_exp, oa, lsm, &oti);
         obdo_free(oa);
-        if (rc)
-                CDEBUG(D_INODE, "destroy orphan objid 0x"LPX64" on ost error "
-                       "%d\n", lsm->lsm_object_id, rc);
 out_free_memmd:
         obd_free_memmd(mds->mds_dt_exp, &lsm);
         RETURN(rc);
@@ -148,8 +152,8 @@ static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild,
                 if (!rc)
                         rc = err;
         } else if (!rc) {
-                rc = mds_osc_destroy_orphan(mds, inode, lmm, lmm_size,
-                                            logcookies, log_unlink);
+                rc = mds_unlink_object(mds, inode, lmm, lmm_size,
+                                       logcookies, log_unlink, 0);
         }
 
         if (logcookies != NULL)
diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c
index 2eb95bc82f..4c49b3a50f 100644
--- a/lustre/osc/osc_request.c
+++ b/lustre/osc/osc_request.c
@@ -492,26 +492,33 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         memcpy(&body->oa, oa, sizeof(*oa));
         request->rq_replen = lustre_msg_size(1, &size);
 
-        rc = ptlrpc_queue_wait(request);
-        
-        if (rc == -ENOENT)
+        if (oti != NULL && oti->oti_async) {
+                /* asynchrounous destroy */
+                ptlrpcd_add_req(request);
                 rc = 0;
-        if (rc)
-                GOTO(out, rc);
+        } else {
+                rc = ptlrpc_queue_wait(request);
+        
+                if (rc == -ENOENT)
+                        rc = 0;
 
-        body = lustre_swab_repbuf(request, 0, sizeof(*body),
-                                  lustre_swab_ost_body);
-        if (body == NULL) {
-                CERROR ("Can't unpack body\n");
-                GOTO (out, rc = -EPROTO);
-        }
+                if (rc) {
+                        ptlrpc_req_finished(request);
+                        RETURN(rc);
+                }
 
-        memcpy(oa, &body->oa, sizeof(*oa));
+                body = lustre_swab_repbuf(request, 0, sizeof(*body),
+                                          lustre_swab_ost_body);
+                if (body == NULL) {
+                        CERROR ("Can't unpack body\n");
+                        ptlrpc_req_finished(request);
+                        RETURN(-EPROTO);
+                }
 
-        EXIT;
- out:
-        ptlrpc_req_finished(request);
-        return rc;
+                memcpy(oa, &body->oa, sizeof(*oa));
+                ptlrpc_req_finished(request);
+        }
+        RETURN(rc);
 }
 
 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c
index 57009d4092..2d3f29a9f8 100644
--- a/lustre/ptlrpc/client.c
+++ b/lustre/ptlrpc/client.c
@@ -540,7 +540,6 @@ static int after_reply(struct ptlrpc_request *req)
         /* Store transno in reqmsg for replay. */
         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
 
-
         if (req->rq_import->imp_replayable) {
                 spin_lock_irqsave(&imp->imp_lock, flags);
                 if (req->rq_transno != 0)
diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh
index 152a24e336..bce9e8471f 100755
--- a/lustre/tests/replay-single.sh
+++ b/lustre/tests/replay-single.sh
@@ -702,13 +702,15 @@ test_32() {
     pid1=$!
     multiop $DIR/$tfile O_c &
     pid2=$!
-    # give multiop a chance to open
-    sleep 1
+    # give multiop a chance to open.
+    # 1 second is not enough, I increased it to 5, however in ideal word
+    # I should have to wait for open finish in more smart manner. --umka
+    sleep 5
     mds_evict_client
     df $MOUNT || sleep 1 && df $MOUNT || return 1
     kill -USR1 $pid1
     kill -USR1 $pid2
-    sleep 1
+    sleep 5
     return 0
 }
 run_test 32 "close() notices client eviction; close() after client eviction"
-- 
GitLab