From 5e9b1828ee5a9cb92b85005c0a34962bee56d137 Mon Sep 17 00:00:00 2001
From: zam <zam>
Date: Thu, 6 Nov 2008 18:11:14 +0000
Subject: [PATCH] Branch HEAD b=15393 i=alex.zhuravlev@sun.com i=tappro@sun.com

Commit on sharing. Eliminate inter-client dependencies between
uncommitted transactions by doing transaction commits.
Thereby clients may recovery independently.
---
 lnet/lnet/lib-msg.c                |   3 +
 lustre/ChangeLog                   |   6 +
 lustre/include/dt_object.h         |  10 ++
 lustre/include/lustre/lustre_idl.h |   3 +-
 lustre/include/lustre_dlm.h        |   7 +
 lustre/include/lustre_net.h        |   5 +-
 lustre/include/obd_support.h       |   1 +
 lustre/ldlm/ldlm_inodebits.c       |  17 ++-
 lustre/ldlm/ldlm_lib.c             |  25 +++-
 lustre/ldlm/ldlm_lock.c            |  39 +++++-
 lustre/ldlm/ldlm_lockd.c           |   3 +
 lustre/ldlm/ldlm_request.c         |  70 +++++++---
 lustre/lvfs/lvfs_linux.c           |   1 -
 lustre/mdt/mdt_handler.c           | 216 ++++++++++++++++++++++++++---
 lustre/mdt/mdt_internal.h          |  17 ++-
 lustre/mdt/mdt_lproc.c             |  24 ++++
 lustre/mdt/mdt_recovery.c          |   5 +-
 lustre/mdt/mdt_reint.c             |   4 +-
 lustre/mgs/mgs_handler.c           |   2 +-
 lustre/obdclass/obd_config.c       |   8 +-
 lustre/obdecho/echo.c              |   2 +-
 lustre/obdfilter/filter.c          |   2 +-
 lustre/osd/osd_handler.c           |  23 +++
 lustre/ost/ost_handler.c           |   8 +-
 lustre/ptlrpc/events.c             |   4 +-
 lustre/ptlrpc/niobuf.c             |   3 +-
 lustre/ptlrpc/service.c            |  25 +++-
 lustre/ptlrpc/wiretest.c           |   4 +-
 lustre/tests/replay-dual.sh        |  26 ++++
 lustre/tests/test-framework.sh     |  10 ++
 lustre/utils/wiretest.c            |   4 +-
 31 files changed, 499 insertions(+), 78 deletions(-)

diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c
index 28aea30e41..68286b3cec 100644
--- a/lnet/lnet/lib-msg.c
+++ b/lnet/lnet/lib-msg.c
@@ -45,6 +45,8 @@
 void
 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
 {
+        ENTRY;
+
         memset(ev, 0, sizeof(*ev));
 
         ev->status   = 0;
@@ -52,6 +54,7 @@ lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
         ev->type     = LNET_EVENT_UNLINK;
         lnet_md_deconstruct(md, &ev->md);
         lnet_md2handle(&ev->md_handle, md);
+        EXIT;
 }
 
 void
diff --git a/lustre/ChangeLog b/lustre/ChangeLog
index 65d8ae74e2..bd543fdb22 100644
--- a/lustre/ChangeLog
+++ b/lustre/ChangeLog
@@ -13,6 +13,12 @@ tbd  Sun Microsystems, Inc.
 	 removed cwd "./" (refer to Bugzilla 14399).
        * File join has been disabled in this release, refer to Bugzilla 16929.
 
+Severity   : enhancement
+Bugzilla   : 15393
+Description: Commit on sharing. Eliminate inter-client dependencies between
+	     uncommitted transactions by doing transaction commits.
+	     Thereby clients may recovery independently.
+
 Severity   : normal
 Frequency  : Create a symlink file with a very long name
 Bugzilla   : 16578
diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h
index 760cee5134..b1fa210be1 100644
--- a/lustre/include/dt_object.h
+++ b/lustre/include/dt_object.h
@@ -127,6 +127,16 @@ struct dt_device_operations {
          */
         int   (*dt_sync)(const struct lu_env *env, struct dt_device *dev);
         void  (*dt_ro)(const struct lu_env *env, struct dt_device *dev);
+        /**
+          * Start a transaction commit asynchronously
+          *
+          * \param env environment
+          * \param dev dt_device to start commit on
+          *
+          * \return 0 success, negative value if error
+          */
+         int   (*dt_commit_async)(const struct lu_env *env,
+                                  struct dt_device *dev);
         /**
          * Initialize capability context.
          */
diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h
index 02c19d84f2..c676d4dd43 100644
--- a/lustre/include/lustre/lustre_idl.h
+++ b/lustre/include/lustre/lustre_idl.h
@@ -1728,10 +1728,11 @@ typedef enum {
         LCK_CR      = 16,
         LCK_NL      = 32,
         LCK_GROUP   = 64,
+        LCK_COS     = 128,
         LCK_MAXMODE
 } ldlm_mode_t;
 
-#define LCK_MODE_NUM    7
+#define LCK_MODE_NUM    8
 
 typedef enum {
         LDLM_PLAIN     = 10,
diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h
index 368ee2105f..1bd1c534a3 100644
--- a/lustre/include/lustre_dlm.h
+++ b/lustre/include/lustre_dlm.h
@@ -201,6 +201,7 @@ typedef enum {
 #define LCK_COMPAT_CR  (LCK_COMPAT_CW | LCK_PR | LCK_PW)
 #define LCK_COMPAT_NL  (LCK_COMPAT_CR | LCK_EX | LCK_GROUP)
 #define LCK_COMPAT_GROUP  (LCK_GROUP | LCK_NL)
+#define LCK_COMPAT_COS (LCK_COS)
 
 extern ldlm_mode_t lck_compat_array[];
 
@@ -669,6 +670,9 @@ struct ldlm_lock {
          * Server-side-only members. 
          */
 
+        /* connection cookie for the client originated the opeation */
+        __u64                 l_client_cookie;
+
         /** 
          * Protected by elt_lock. Callbacks pending.
          */
@@ -963,6 +967,7 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
                             struct lustre_handle *);
 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                                         __u32 *flags);
+void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode);
 void ldlm_lock_cancel(struct ldlm_lock *lock);
 void ldlm_cancel_locks_for_export(struct obd_export *export);
 void ldlm_reprocess_all(struct ldlm_resource *res);
@@ -1027,6 +1032,7 @@ struct ldlm_callback_suite {
 
 /* ldlm_request.c */
 int ldlm_expired_completion_wait(void *data);
+int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock);
 int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                       void *data, int flag);
 int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp);
@@ -1062,6 +1068,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                            ldlm_completion_callback completion,
                            ldlm_glimpse_callback glimpse,
                            void *data, __u32 lvb_len, void *lvb_swabber,
+                           const __u64 *client_cookie,
                            struct lustre_handle *lockh);
 int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
                     void *data, __u32 data_len);
diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h
index a4377b5cbb..ff6a1228b5 100644
--- a/lustre/include/lustre_net.h
+++ b/lustre/include/lustre_net.h
@@ -260,6 +260,8 @@ struct ptlrpc_reply_state {
 #endif
         /* updates to following flag serialised by srv_request_lock */
         unsigned long          rs_difficult:1;     /* ACK/commit stuff */
+        unsigned long          rs_no_ack:1;    /* no ACK, even for
+                                                  difficult requests */
         unsigned long          rs_scheduled:1;     /* being handled? */
         unsigned long          rs_scheduled_ever:1;/* any schedule attempts? */
         unsigned long          rs_handled:1;  /* been handled yet? */
@@ -661,6 +663,7 @@ struct ptlrpc_service {
         int              srv_watchdog_factor;   /* soft watchdog timeout mutiplier */
         unsigned         srv_cpu_affinity:1;    /* bind threads to CPUs */
         unsigned         srv_at_check:1;        /* check early replies */
+        unsigned         srv_is_stopping:1;     /* under unregister_service */
         cfs_time_t       srv_at_checktime;      /* debug */
 
         __u32            srv_req_portal;
@@ -954,7 +957,7 @@ struct ptlrpc_service_conf {
 
 /* ptlrpc/service.c */
 void ptlrpc_save_lock (struct ptlrpc_request *req,
-                       struct lustre_handle *lock, int mode);
+                       struct lustre_handle *lock, int mode, int no_ack);
 void ptlrpc_commit_replies (struct obd_device *obd);
 void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs);
 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h
index fbd4a9650a..14bd5a8f50 100644
--- a/lustre/include/obd_support.h
+++ b/lustre/include/obd_support.h
@@ -190,6 +190,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_MDS_LOV_SYNC_RACE       0x13e
 #define OBD_FAIL_MDS_CLOSE_NET_REP       0x13f
 #define OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT   0x140
+#define OBD_FAIL_MDS_RECOVERY_ACCEPTS_GAPS 0x141
 
 #define OBD_FAIL_OST                     0x200
 #define OBD_FAIL_OST_CONNECT_NET         0x201
diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c
index 07014fb3b2..548ee1431e 100644
--- a/lustre/ldlm/ldlm_inodebits.c
+++ b/lustre/ldlm/ldlm_inodebits.c
@@ -86,7 +86,22 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                         tmp = mode_tail;
                         continue;
                 }
-                
+
+                if (lock->l_req_mode == LCK_COS) {
+                        if (lock->l_client_cookie == req->l_client_cookie) {
+                                tmp = mode_tail;
+                        } else {
+                                tmp = mode_tail;
+                                if (!work_list)
+                                        RETURN(0);
+                                compat = 0;
+                                if (lock->l_blocking_ast)
+                                        ldlm_add_ast_work_item(lock, req,
+                                                               work_list);
+                        }
+                        continue;
+                }
+
                 for (;;) {
                         struct list_head *head;
 
diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c
index e7397210d2..5e3e9d69ba 100644
--- a/lustre/ldlm/ldlm_lib.c
+++ b/lustre/ldlm/ldlm_lib.c
@@ -1393,6 +1393,11 @@ static int check_for_next_transno(struct obd_device *obd)
                        next_transno, queue_len, completed, connected, req_transno);
                 obd->obd_next_recovery_transno = req_transno;
                 wake_up = 1;
+        } else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_RECOVERY_ACCEPTS_GAPS)) {
+                CDEBUG(D_HA, "accepting transno gaps is explicitly allowed"
+                       " by fail_lock, waking up ("LPD64")\n", next_transno);
+                obd->obd_next_recovery_transno = req_transno;
+                wake_up = 1;
         } else if (queue_len == atomic_read(&obd->obd_req_replay_clients)) {
                 /* some clients haven't connected in time, but we can try
                  * to replay requests that demand on already committed ones
@@ -2044,15 +2049,19 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
         struct obd_device         *obd;
         struct obd_export         *exp;
         struct ptlrpc_service     *svc;
+        ENTRY;
 
-        if (req->rq_no_reply)
+        if (req->rq_no_reply) {
+                EXIT;
                 return;
+        }
 
         svc = req->rq_rqbd->rqbd_service;
         rs = req->rq_reply_state;
         if (rs == NULL || !rs->rs_difficult) {
                 /* no notifiers */
                 target_send_reply_msg (req, rc, fail_id);
+                EXIT;
                 return;
         }
 
@@ -2082,6 +2091,8 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
 
         spin_lock(&obd->obd_uncommitted_replies_lock);
 
+        CDEBUG(D_NET, "rs transno = "LPU64", last committed = "LPU64"\n",
+               rs->rs_transno, obd->obd_last_committed);
         if (rs->rs_transno > obd->obd_last_committed) {
                 /* not committed already */
                 list_add_tail (&rs->rs_obd_list,
@@ -2112,9 +2123,11 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
                 atomic_inc (&svc->srv_outstanding_replies);
         }
 
-        if (!rs->rs_on_net ||                   /* some notifier */
-            list_empty(&rs->rs_exp_list) ||     /* completed already */
-            list_empty(&rs->rs_obd_list)) {
+        if (rs->rs_transno <= obd->obd_last_committed ||
+            (!rs->rs_on_net && !rs->rs_no_ack) ||
+             list_empty(&rs->rs_exp_list) ||     /* completed already */
+             list_empty(&rs->rs_obd_list)) {
+                CDEBUG(D_HA, "Schedule reply immediately\n");
                 list_add_tail (&rs->rs_list, &svc->srv_reply_queue);
                 cfs_waitq_signal (&svc->srv_waitq);
         } else {
@@ -2123,6 +2136,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
         }
 
         spin_unlock(&svc->srv_lock);
+        EXIT;
 }
 
 int target_handle_ping(struct ptlrpc_request *req)
@@ -2250,7 +2264,8 @@ ldlm_mode_t lck_compat_array[] = {
         [LCK_CW] LCK_COMPAT_CW,
         [LCK_CR] LCK_COMPAT_CR,
         [LCK_NL] LCK_COMPAT_NL,
-        [LCK_GROUP] LCK_COMPAT_GROUP
+        [LCK_GROUP] LCK_COMPAT_GROUP,
+        [LCK_COS] LCK_COMPAT_COS,
 };
 
 /**
diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c
index 2b8c9bf6af..1ea9a7b3ea 100644
--- a/lustre/ldlm/ldlm_lock.c
+++ b/lustre/ldlm/ldlm_lock.c
@@ -62,7 +62,8 @@ char *ldlm_lockname[] = {
         [LCK_CW] "CW",
         [LCK_CR] "CR",
         [LCK_NL] "NL",
-        [LCK_GROUP] "GROUP"
+        [LCK_GROUP] "GROUP",
+        [LCK_COS] "COS"
 };
 
 char *ldlm_typename[] = {
@@ -592,7 +593,7 @@ void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
                 lock->l_readers++;
                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
         }
-        if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP)) {
+        if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
                 lock->l_writers++;
                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
         }
@@ -648,7 +649,7 @@ void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
                 lu_ref_del(&lock->l_reference, "reader", lock);
                 lock->l_readers--;
         }
-        if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP)) {
+        if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
                 LASSERT(lock->l_writers > 0);
                 lu_ref_del(&lock->l_reference, "writer", lock);
                 lock->l_writers--;
@@ -1447,10 +1448,10 @@ ldlm_work_bl_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg)
 
         ldlm_lock2desc(lock->l_blocking_lock, &d);
 
-        LDLM_LOCK_RELEASE(lock->l_blocking_lock);
-        lock->l_blocking_lock = NULL;
         lock->l_blocking_ast(lock, &d, (void *)arg,
                              LDLM_CB_BLOCKING);
+        LDLM_LOCK_RELEASE(lock->l_blocking_lock);
+        lock->l_blocking_lock = NULL;
         LDLM_LOCK_RELEASE(lock);
 
         RETURN(1);
@@ -1739,6 +1740,32 @@ void ldlm_cancel_locks_for_export(struct obd_export *exp)
                                    ldlm_cancel_locks_for_export_cb, exp);
 }
 
+/**
+ * Downgrade an exclusive lock.
+ *
+ * A fast variant of ldlm_lock_convert for convertion of exclusive
+ * locks. The convertion is always successful.
+ *
+ * \param lock A lock to convert
+ * \param new_mode new lock mode
+ */
+void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
+{
+        ENTRY;
+
+        LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
+        LASSERT(new_mode == LCK_COS);
+
+        lock_res_and_lock(lock);
+        ldlm_resource_unlink_lock(lock);
+        lock->l_req_mode = new_mode;
+        ldlm_grant_lock(lock, NULL);
+        unlock_res_and_lock(lock);
+        ldlm_reprocess_all(lock->l_resource);
+
+        EXIT;
+}
+
 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                                         __u32 *flags)
 {
@@ -1763,7 +1790,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
                 RETURN(NULL);
 
-        LASSERTF(new_mode == LCK_PW && lock->l_granted_mode == LCK_PR,
+        LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
 
         lock_res_and_lock(lock);
diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c
index ee466e8b49..7891e7a8c4 100644
--- a/lustre/ldlm/ldlm_lockd.c
+++ b/lustre/ldlm/ldlm_lockd.c
@@ -2410,9 +2410,12 @@ EXPORT_SYMBOL(ldlm_lock_dump_handle);
 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
 EXPORT_SYMBOL(ldlm_lock_allow_match);
+EXPORT_SYMBOL(ldlm_lock_downgrade);
+EXPORT_SYMBOL(ldlm_lock_convert);
 
 /* ldlm_request.c */
 EXPORT_SYMBOL(ldlm_completion_ast_async);
+EXPORT_SYMBOL(ldlm_blocking_ast_nocheck);
 EXPORT_SYMBOL(ldlm_completion_ast);
 EXPORT_SYMBOL(ldlm_blocking_ast);
 EXPORT_SYMBOL(ldlm_glimpse_ast);
diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c
index 6bf1efe2a5..95ee14d0f2 100644
--- a/lustre/ldlm/ldlm_request.c
+++ b/lustre/ldlm/ldlm_request.c
@@ -276,31 +276,22 @@ noreproc:
         RETURN(ldlm_completion_tail(lock));
 }
 
-/*
- * ->l_blocking_ast() callback for LDLM locks acquired by server-side OBDs.
+/**
+ * A helper to build a blocking ast function
+ *
+ * Perform a common operation for blocking asts:
+ * defferred lock cancellation.
+ *
+ * \param lock the lock blocking or canceling ast was called on
+ * \retval 0
+ * \see mdt_blocking_ast
+ * \see ldlm_blocking_ast
  */
-int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                      void *data, int flag)
+int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock)
 {
         int do_ast;
         ENTRY;
 
-        if (flag == LDLM_CB_CANCELING) {
-                /* Don't need to do anything here. */
-                RETURN(0);
-        }
-
-        lock_res_and_lock(lock);
-        /* Get this: if ldlm_blocking_ast is racing with intent_policy, such
-         * that ldlm_blocking_ast is called just before intent_policy method
-         * takes the ns_lock, then by the time we get the lock, we might not
-         * be the correct blocking function anymore.  So check, and return
-         * early, if so. */
-        if (lock->l_blocking_ast != ldlm_blocking_ast) {
-                unlock_res_and_lock(lock);
-                RETURN(0);
-        }
-
         lock->l_flags |= LDLM_FL_CBPENDING;
         do_ast = (!lock->l_readers && !lock->l_writers);
         unlock_res_and_lock(lock);
@@ -321,6 +312,42 @@ int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
         RETURN(0);
 }
 
+/**
+ * Server blocking AST
+ *
+ * ->l_blocking_ast() callback for LDLM locks acquired by server-side
+ * OBDs.
+ *
+ * \param lock the lock which blocks a request or cancelling lock
+ * \param desc unused
+ * \param data unused
+ * \param flag indicates whether this cancelling or blocking callback
+ * \retval 0
+ * \see ldlm_blocking_ast_nocheck
+ */
+int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                      void *data, int flag)
+{
+        ENTRY;
+
+        if (flag == LDLM_CB_CANCELING) {
+                /* Don't need to do anything here. */
+                RETURN(0);
+        }
+
+        lock_res_and_lock(lock);
+        /* Get this: if ldlm_blocking_ast is racing with intent_policy, such
+         * that ldlm_blocking_ast is called just before intent_policy method
+         * takes the ns_lock, then by the time we get the lock, we might not
+         * be the correct blocking function anymore.  So check, and return
+         * early, if so. */
+        if (lock->l_blocking_ast != ldlm_blocking_ast) {
+                unlock_res_and_lock(lock);
+                RETURN(0);
+        }
+        RETURN(ldlm_blocking_ast_nocheck(lock));
+}
+
 /*
  * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See
  * comment in filter_intent_policy() on why you may need this.
@@ -356,6 +383,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                            ldlm_completion_callback completion,
                            ldlm_glimpse_callback glimpse,
                            void *data, __u32 lvb_len, void *lvb_swabber,
+                           const __u64 *client_cookie,
                            struct lustre_handle *lockh)
 {
         struct ldlm_lock *lock;
@@ -387,6 +415,8 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         unlock_res_and_lock(lock);
         if (policy != NULL)
                 lock->l_policy_data = *policy;
+        if (client_cookie != NULL)
+                lock->l_client_cookie = *client_cookie;
         if (type == LDLM_EXTENT)
                 lock->l_req_extent = policy->l_extent;
 
diff --git a/lustre/lvfs/lvfs_linux.c b/lustre/lvfs/lvfs_linux.c
index 0520730bc5..4c2b1a9b8f 100644
--- a/lustre/lvfs/lvfs_linux.c
+++ b/lustre/lvfs/lvfs_linux.c
@@ -430,7 +430,6 @@ int dev_check_rdonly(lvfs_sbdev_type dev);
 
 void __lvfs_set_rdonly(lvfs_sbdev_type dev, lvfs_sbdev_type jdev)
 {
-        lvfs_sbdev_sync(dev);
         if (jdev && (jdev != dev)) {
                 CDEBUG(D_IOCTL | D_HA, "set journal dev %lx rdonly\n",
                        (long)jdev);
diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c
index 42a6e78200..ffdc67c158 100644
--- a/lustre/mdt/mdt_handler.c
+++ b/lustre/mdt/mdt_handler.c
@@ -1776,6 +1776,108 @@ struct mdt_object *mdt_object_find(const struct lu_env *env,
         RETURN(m);
 }
 
+/**
+ * Asyncronous commit for mdt device.
+ *
+ * Pass asynchonous commit call down the MDS stack.
+ *
+ * \param env environment
+ * \param mdt the mdt device
+ */
+static void mdt_device_commit_async(const struct lu_env *env,
+                                    struct mdt_device *mdt)
+{
+        struct dt_device *dt = mdt->mdt_bottom;
+        int rc;
+
+        rc = dt->dd_ops->dt_commit_async(env, dt);
+        if (unlikely(rc != 0))
+                CWARN("async commit start failed with rc = %d", rc);
+}
+
+/**
+ * Mark the lock as "synchonous".
+ *
+ * Mark the lock to deffer transaction commit to the unlock time.
+ *
+ * \param lock the lock to mark as "synchonous"
+ *
+ * \see mdt_is_lock_sync
+ * \see mdt_save_lock
+ */
+static inline void mdt_set_lock_sync(struct ldlm_lock *lock)
+{
+        lock->l_ast_data = (void*)1;
+}
+
+/**
+ * Check whehter the lock "synchonous" or not.
+ *
+ * \param lock the lock to check
+ * \retval 1 the lock is "synchonous"
+ * \retval 0 the lock isn't "synchronous"
+ *
+ * \see mdt_set_lock_sync
+ * \see mdt_save_lock
+ */
+static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
+{
+        return lock->l_ast_data != NULL;
+}
+
+/**
+ * Blocking AST for mdt locks.
+ *
+ * Starts transaction commit if in case of COS lock conflict or
+ * deffers such a commit to the mdt_save_lock.
+ *
+ * \param lock the lock which blocks a request or cancelling lock
+ * \param desc unused
+ * \param data unused
+ * \param flag indicates whether this cancelling or blocking callback
+ * \retval 0
+ * \see ldlm_blocking_ast_nocheck
+ */
+int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                     void *data, int flag)
+{
+        struct obd_device *obd = lock->l_resource->lr_namespace->ns_obd;
+        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+        int rc;
+        ENTRY;
+
+        if (flag == LDLM_CB_CANCELING)
+                RETURN(0);
+        lock_res_and_lock(lock);
+        if (lock->l_blocking_ast != mdt_blocking_ast) {
+                unlock_res_and_lock(lock);
+                RETURN(0);
+        }
+        if (mdt_cos_is_enabled(mdt) &&
+            lock->l_req_mode & (LCK_PW | LCK_EX) &&
+            lock->l_blocking_lock != NULL &&
+            lock->l_client_cookie != lock->l_blocking_lock->l_client_cookie) {
+                mdt_set_lock_sync(lock);
+        }
+        rc = ldlm_blocking_ast_nocheck(lock);
+
+        /* There is no lock conflict if l_blocking_lock == NULL,
+         * it indicates a blocking ast sent from ldlm_lock_decref_internal
+         * when the last reference to a local lock was released */
+        if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) {
+                struct lu_env env;
+
+                rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
+                if (unlikely(rc != 0))
+                        CWARN("lu_env initialization failed with rc = %d,"
+                              "cannot start asynchronous commit\n", rc);
+                else
+                        mdt_device_commit_async(&env, mdt);
+                lu_env_fini(&env);
+        }
+        RETURN(rc);
+}
+
 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
                     struct mdt_lock_handle *lh, __u64 ibits, int locality)
 {
@@ -1832,7 +1934,8 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
                          */
                         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
                         rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
-                                          policy, res_id, LDLM_FL_ATOMIC_CB);
+                                          policy, res_id, LDLM_FL_ATOMIC_CB,
+                                          &info->mti_exp->exp_handle.h_cookie);
                         if (unlikely(rc))
                                 RETURN(rc);
                 }
@@ -1852,8 +1955,8 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
          * fix it up and turn FL_LOCAL flag off.
          */
         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
-                          res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB);
-
+                          res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
+                          &info->mti_exp->exp_handle.h_cookie);
         if (rc)
                 GOTO(out, rc);
 
@@ -1865,36 +1968,79 @@ out:
         RETURN(rc);
 }
 
-static inline
-void mdt_save_lock(struct ptlrpc_request *req, struct lustre_handle *h,
+/**
+ * Save a lock within request object.
+ *
+ * Keep the lock referenced until whether client ACK or transaction
+ * commit happens or release the lock immediately depending on input
+ * parameters. If COS is ON, a write lock is converted to COS lock
+ * before saving.
+ *
+ * \param info thead info object
+ * \param h lock handle
+ * \param mode lock mode
+ * \param decref force immediate lock releasing
+ */
+static
+void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
                    ldlm_mode_t mode, int decref)
 {
         ENTRY;
 
         if (lustre_handle_is_used(h)) {
-                if (decref)
+                if (decref || !info->mti_has_trans ||
+                    !(mode & (LCK_PW | LCK_EX))){
                         mdt_fid_unlock(h, mode);
-                else
-                        ptlrpc_save_lock(req, h, mode);
+                } else {
+                        struct mdt_device *mdt = info->mti_mdt;
+                        struct ldlm_lock *lock = ldlm_handle2lock(h);
+                        struct ptlrpc_request *req = mdt_info_req(info);
+                        int no_ack = 0;
+
+                        LASSERTF(lock != NULL, "no lock for cookie "LPX64"\n",
+                                 h->cookie);
+                        CDEBUG(D_HA, "request = %p reply state = %p"
+                               " transno = "LPD64"\n",
+                               req, req->rq_reply_state, req->rq_transno);
+                        if (mdt_cos_is_enabled(mdt)) {
+                                no_ack = 1;
+                                ldlm_lock_downgrade(lock, LCK_COS);
+                                mode = LCK_COS;
+                        }
+                        ptlrpc_save_lock(req, h, mode, no_ack);
+                        if (mdt_is_lock_sync(lock)) {
+                                CDEBUG(D_HA, "found sync-lock,"
+                                       " async commit started\n");
+                                mdt_device_commit_async(info->mti_env,
+                                                        mdt);
+                        }
+                        LDLM_LOCK_PUT(lock);
+                }
                 h->cookie = 0ull;
         }
 
         EXIT;
 }
 
-/*
- * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
- * to save this lock in req.  when transaction committed, req will be released,
- * and lock will, too.
+/**
+ * Unlock mdt object.
+ *
+ * Immeditely release the regular lock and the PDO lock or save the
+ * lock in reqeuest and keep them referenced until client ACK or
+ * transaction commit.
+ *
+ * \param info thread info object
+ * \param o mdt object
+ * \param h mdt lock handle referencing regular and PDO locks
+ * \param decref force immediate lock releasing
  */
 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
                        struct mdt_lock_handle *lh, int decref)
 {
-        struct ptlrpc_request *req = mdt_info_req(info);
         ENTRY;
 
-        mdt_save_lock(req, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
-        mdt_save_lock(req, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
+        mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
+        mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
 
         EXIT;
 }
@@ -3388,7 +3534,7 @@ static void mdt_stop_ptlrpc_service(struct mdt_device *m)
                 ptlrpc_unregister_service(m->mdt_fld_service);
                 m->mdt_fld_service = NULL;
         }
-        ENTRY;
+        EXIT;
 }
 
 static int mdt_start_ptlrpc_service(struct mdt_device *m)
@@ -3944,6 +4090,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 
         m->mdt_opts.mo_user_xattr = 0;
         m->mdt_opts.mo_acl = 0;
+        m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
         lmi = server_get_mount_2(dev);
         if (lmi == NULL) {
                 CERROR("Cannot get mount info for %s!\n", dev);
@@ -4715,7 +4862,6 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 rc = mdt_device_sync(&env, mdt);
                 break;
         case OBD_IOC_SET_READONLY:
-                rc = dt->dd_ops->dt_sync(&env, dt);
                 dt->dd_ops->dt_ro(&env, dt);
                 break;
         case OBD_IOC_ABORT_RECOVERY:
@@ -4837,6 +4983,42 @@ struct md_ucred *mdt_ucred(const struct mdt_thread_info *info)
         return md_ucred(info->mti_env);
 }
 
+/**
+ * Enable/disable COS.
+ *
+ * Set/Clear the COS flag in mdt options.
+ *
+ * \param mdt mdt device
+ * \param val 0 disables COS, other values enable COS
+ */
+void mdt_enable_cos(struct mdt_device *mdt, int val)
+{
+        struct lu_env env;
+        int rc;
+
+        mdt->mdt_opts.mo_cos = !!val;
+        rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
+        if (unlikely(rc != 0)) {
+                CWARN("lu_env initialization failed with rc = %d,"
+                      "cannot sync\n", rc);
+                return;
+        }
+        mdt_device_sync(&env, mdt);
+        lu_env_fini(&env);
+}
+
+/**
+ * Check COS status.
+ *
+ * Return COS flag status/
+ *
+ * \param mdt mdt device
+ */
+int mdt_cos_is_enabled(struct mdt_device *mdt)
+{
+        return mdt->mdt_opts.mo_cos != 0;
+}
+
 /* type constructor/destructor: mdt_type_init, mdt_type_fini */
 LU_TYPE_INIT_FINI(mdt, &mdt_thread_key, &mdt_txn_key);
 
diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h
index 5457a0535a..425185806c 100644
--- a/lustre/mdt/mdt_internal.h
+++ b/lustre/mdt/mdt_internal.h
@@ -127,7 +127,8 @@ struct mdt_device {
                                    mo_acl        :1,
                                    mo_compat_resname:1,
                                    mo_mds_capa   :1,
-                                   mo_oss_capa   :1;
+                                   mo_oss_capa   :1,
+                                   mo_cos        :1;
         } mdt_opts;
         /* mdt state flags */
         __u32                      mdt_fl_cfglog:1,
@@ -180,6 +181,7 @@ struct mdt_device {
 #define MDT_SERVICE_WATCHDOG_FACTOR     (2000)
 #define MDT_ROCOMPAT_SUPP       (OBD_ROCOMPAT_LOVOBJID)
 #define MDT_INCOMPAT_SUPP       (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR)
+#define MDT_COS_DEFAULT         (1)
 
 struct mdt_object {
         struct lu_object_header mot_header;
@@ -680,13 +682,15 @@ static inline int is_identity_get_disabled(struct upcall_cache *cache)
         return cache ? (strcmp(cache->uc_upcall, "NONE") == 0) : 1;
 }
 
+int mdt_blocking_ast(struct ldlm_lock*, struct ldlm_lock_desc*, void*, int);
+
 /* Issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
 static inline int mdt_fid_lock(struct ldlm_namespace *ns,
                                struct lustre_handle *lh,
                                ldlm_mode_t mode,
                                ldlm_policy_data_t *policy,
                                const struct ldlm_res_id *res_id,
-                               int flags)
+                               int flags, const __u64 *client_cookie)
 {
         int rc;
 
@@ -694,9 +698,9 @@ static inline int mdt_fid_lock(struct ldlm_namespace *ns,
         LASSERT(lh != NULL);
 
         rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
-                                    mode, &flags, ldlm_blocking_ast,
-                                    ldlm_completion_ast, NULL, NULL,
-                                    0, NULL, lh);
+                                    mode, &flags, mdt_blocking_ast,
+                                    ldlm_completion_ast,
+                                    NULL, NULL, 0, NULL, client_cookie, lh);
         return rc == ELDLM_OK ? 0 : -EIO;
 }
 
@@ -749,6 +753,9 @@ static inline struct lu_name *mdt_name_copy(struct lu_name *tlname,
         return tlname;
 }
 
+void mdt_enable_cos(struct mdt_device *, int);
+int mdt_cos_is_enabled(struct mdt_device *);
+
 /* lprocfs stuff */
 void lprocfs_mdt_init_vars(struct lprocfs_static_vars *lvars);
 int mdt_procfs_init(struct mdt_device *mdt, const char *name);
diff --git a/lustre/mdt/mdt_lproc.c b/lustre/mdt/mdt_lproc.c
index 171c77ada1..0e95718812 100644
--- a/lustre/mdt/mdt_lproc.c
+++ b/lustre/mdt/mdt_lproc.c
@@ -425,6 +425,29 @@ static int lprocfs_mdt_wr_evict_client(struct file *file, const char *buffer,
         return count;
 }
 
+static int lprocfs_rd_cos(char *page, char **start, off_t off,
+                              int count, int *eof, void *data)
+{
+        struct obd_device *obd = data;
+        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+        return snprintf(page, count, "%u\n", mdt_cos_is_enabled(mdt));
+}
+
+static int lprocfs_wr_cos(struct file *file, const char *buffer,
+                                  unsigned long count, void *data)
+{
+        struct obd_device *obd = data;
+        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+        int val, rc;
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+        if (rc)
+                return rc;
+        mdt_enable_cos(mdt, val);
+        return count;
+}
+
 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
         { "uuid",                       lprocfs_rd_uuid,                 0, 0 },
         { "recovery_status",            lprocfs_obd_rd_recovery_status,  0, 0 },
@@ -447,6 +470,7 @@ static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
         { "site_stats",                 lprocfs_rd_site_stats,           0, 0 },
         { "evict_client",               0, lprocfs_mdt_wr_evict_client,     0 },
         { "hash_stats",                 lprocfs_obd_rd_hash,    0, 0 },
+        { "commit_on_sharing",          lprocfs_rd_cos, lprocfs_wr_cos, 0 },
         { 0 }
 };
 
diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c
index 32bdeb5325..74e2402891 100644
--- a/lustre/mdt/mdt_recovery.c
+++ b/lustre/mdt/mdt_recovery.c
@@ -1051,9 +1051,8 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req)
                       libcfs_nid2str(exp->exp_connection->c_peer.nid));
 
                 for (i = 0; i < oldrep->rs_nlocks; i++)
-                        ptlrpc_save_lock(req,
-                                         &oldrep->rs_locks[i],
-                                         oldrep->rs_modes[i]);
+                        ptlrpc_save_lock(req, &oldrep->rs_locks[i],
+                                         oldrep->rs_modes[i], 0);
                 oldrep->rs_nlocks = 0;
 
                 DEBUG_REQ(D_HA, req, "stole locks for");
diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c
index bfdff5f28b..4c565abae5 100644
--- a/lustre/mdt/mdt_reint.c
+++ b/lustre/mdt/mdt_reint.c
@@ -751,7 +751,9 @@ static int mdt_rename_lock(struct mdt_thread_info *info,
                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
                                             LCK_EX, &flags, ldlm_blocking_ast,
                                             ldlm_completion_ast, NULL, NULL, 0,
-                                            NULL, lh);
+                                            NULL,
+                                            &info->mti_exp->exp_handle.h_cookie,
+                                            lh);
         } else {
                 struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_EX,
                      ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL };
diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c
index ca41d1af53..e724962a64 100644
--- a/lustre/mgs/mgs_handler.c
+++ b/lustre/mgs/mgs_handler.c
@@ -348,7 +348,7 @@ static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
                                             LDLM_PLAIN, NULL, LCK_EX,
                                             &flags, ldlm_blocking_ast,
                                             ldlm_completion_ast, NULL,
-                                            fsname, 0, NULL, lockh);
+                                            fsname, 0, NULL, NULL, lockh);
         if (rc)
                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
 
diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c
index 8e3a854e9d..82ce630c73 100644
--- a/lustre/obdclass/obd_config.c
+++ b/lustre/obdclass/obd_config.c
@@ -439,11 +439,15 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg)
                                 obd->obd_fail = 1;
                                 obd->obd_no_transno = 1;
                                 obd->obd_no_recov = 1;
-                                /* Set the obd readonly if we can */
-                                if (OBP(obd, iocontrol))
+                                if (OBP(obd, iocontrol)) {
+                                        obd_iocontrol(OBD_IOC_SYNC,
+                                                      obd->obd_self_export,
+                                                      0, NULL, NULL);
+                                 /* Set the obd readonly if we can */
                                         obd_iocontrol(OBD_IOC_SET_READONLY,
                                                       obd->obd_self_export,
                                                       0, NULL, NULL);
+                                }
                                 break;
                         default:
                                 CERROR("unrecognised flag '%c'\n",
diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c
index 5c77003d07..663ac58856 100644
--- a/lustre/obdecho/echo.c
+++ b/lustre/obdecho/echo.c
@@ -536,7 +536,7 @@ static int echo_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_PLAIN,
                                     NULL, LCK_NL, &lock_flags, NULL,
                                     ldlm_completion_ast, NULL, NULL,
-                                    0, NULL, &obd->u.echo.eo_nl_lock);
+                                    0, NULL, NULL, &obd->u.echo.eo_nl_lock);
         LASSERT (rc == ELDLM_OK);
 
         lprocfs_echo_init_vars(&lvars);
diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c
index 2809116c7e..c7b65b3108 100644
--- a/lustre/obdfilter/filter.c
+++ b/lustre/obdfilter/filter.c
@@ -1545,7 +1545,7 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid,
         rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_EXTENT,
                                     &policy, LCK_PW, &flags, ldlm_blocking_ast,
                                     ldlm_completion_ast, NULL, NULL, 0, NULL,
-                                    &lockh);
+                                    NULL, &lockh);
 
         /* We only care about the side-effects, just drop the lock. */
         if (rc == ELDLM_OK)
diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c
index 5d9a155581..98a872f99f 100644
--- a/lustre/osd/osd_handler.c
+++ b/lustre/osd/osd_handler.c
@@ -703,6 +703,28 @@ static int osd_sync(const struct lu_env *env, struct dt_device *d)
         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
 }
 
+/**
+ * Start commit for OSD device.
+ *
+ * An implementation of dt_commit_async method for OSD device.
+ * Asychronously starts underlayng fs sync and thereby a transaction
+ * commit.
+ *
+ * \param env environment
+ * \param d dt device
+ *
+ * \see dt_device_operations
+ */
+static int osd_commit_async(const struct lu_env *env,
+                            struct dt_device *d)
+{
+        struct super_block *s = osd_sb(osd_dt_dev(d));
+        ENTRY;
+
+        CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
+        RETURN(s->s_op->sync_fs(s, 0));
+}
+
 /*
  * Concurrency: shouldn't matter.
  */
@@ -786,6 +808,7 @@ static const struct dt_device_operations osd_dt_ops = {
         .dt_conf_get       = osd_conf_get,
         .dt_sync           = osd_sync,
         .dt_ro             = osd_ro,
+        .dt_commit_async   = osd_commit_async,
         .dt_credit_get     = osd_credit_get,
         .dt_init_capa_ctxt = osd_init_capa_ctxt,
 };
diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c
index 1f613e1b43..2618d84c2c 100644
--- a/lustre/ost/ost_handler.c
+++ b/lustre/ost/ost_handler.c
@@ -86,7 +86,7 @@ void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
                 if (!ack_lock->mode)
                         break;
                 /* XXX not even calling target_send_reply in some cases... */
-                ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
+                ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0);
         }
 }
 
@@ -253,7 +253,8 @@ static int ost_punch_lock_get(struct obd_export *exp, struct obdo *oa,
         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
                                       LDLM_EXTENT, &policy, LCK_PW, &flags,
                                       ldlm_blocking_ast, ldlm_completion_ast,
-                                      ldlm_glimpse_ast, NULL, 0, NULL, lh));
+                                      ldlm_glimpse_ast, NULL, 0, NULL,
+                                      NULL, lh));
 }
 
 /*
@@ -452,7 +453,8 @@ static int ost_brw_lock_get(int mode, struct obd_export *exp,
         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
                                       LDLM_EXTENT, &policy, mode, &flags,
                                       ldlm_blocking_ast, ldlm_completion_ast,
-                                      ldlm_glimpse_ast, NULL, 0, NULL, lh));
+                                      ldlm_glimpse_ast, NULL, 0, NULL,
+                                      NULL, lh));
 }
 
 static void ost_brw_lock_put(int mode,
diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c
index 7cca13fb50..5133fb6e3d 100644
--- a/lustre/ptlrpc/events.c
+++ b/lustre/ptlrpc/events.c
@@ -341,7 +341,9 @@ void reply_out_callback(lnet_event_t *ev)
                  * until ptlrpc_server_handle_reply() is done with it */
                 spin_lock(&svc->srv_lock);
                 rs->rs_on_net = 0;
-                ptlrpc_schedule_difficult_reply (rs);
+                if (!rs->rs_no_ack ||
+                    rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed)
+                        ptlrpc_schedule_difficult_reply (rs);
                 spin_unlock(&svc->srv_lock);
         }
 
diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c
index d79ad5b5fb..476f6662b2 100644
--- a/lustre/ptlrpc/niobuf.c
+++ b/lustre/ptlrpc/niobuf.c
@@ -421,7 +421,8 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int flags)
         req->rq_sent = cfs_time_current_sec();
 
         rc = ptl_send_buf (&rs->rs_md_h, rs->rs_repbuf, rs->rs_repdata_len,
-                           rs->rs_difficult ? LNET_ACK_REQ : LNET_NOACK_REQ,
+                           (rs->rs_difficult && !rs->rs_no_ack) ?
+                           LNET_ACK_REQ : LNET_NOACK_REQ,
                            &rs->rs_cb_id, conn, svc->srv_rep_portal,
                            req->rq_xid, req->rq_reply_off);
 out:
diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c
index 6507f1a66d..ee8913c1e4 100644
--- a/lustre/ptlrpc/service.c
+++ b/lustre/ptlrpc/service.c
@@ -169,7 +169,7 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service *svc)
 
 void
 ptlrpc_save_lock (struct ptlrpc_request *req,
-                  struct lustre_handle *lock, int mode)
+                  struct lustre_handle *lock, int mode, int no_ack)
 {
         struct ptlrpc_reply_state *rs = req->rq_reply_state;
         int                        idx;
@@ -181,12 +181,14 @@ ptlrpc_save_lock (struct ptlrpc_request *req,
         rs->rs_locks[idx] = *lock;
         rs->rs_modes[idx] = mode;
         rs->rs_difficult = 1;
+        rs->rs_no_ack = !!no_ack;
 }
 
 void
 ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
 {
         struct ptlrpc_service *svc = rs->rs_service;
+        ENTRY;
 
 #ifdef CONFIG_SMP
         LASSERT (spin_is_locked (&svc->srv_lock));
@@ -194,13 +196,16 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
         LASSERT (rs->rs_difficult);
         rs->rs_scheduled_ever = 1;              /* flag any notification attempt */
 
-        if (rs->rs_scheduled)                   /* being set up or already notified */
+        if (rs->rs_scheduled) {                  /* being set up or already notified */
+                EXIT;
                 return;
+        }
 
         rs->rs_scheduled = 1;
         list_del (&rs->rs_list);
         list_add (&rs->rs_list, &svc->srv_reply_queue);
         cfs_waitq_signal (&svc->srv_waitq);
+        EXIT;
 }
 
 void
@@ -208,6 +213,7 @@ ptlrpc_commit_replies (struct obd_device *obd)
 {
         struct list_head   *tmp;
         struct list_head   *nxt;
+        ENTRY;
 
         /* Find any replies that have been committed and get their service
          * to attend to complete them. */
@@ -232,6 +238,7 @@ ptlrpc_commit_replies (struct obd_device *obd)
         }
 
         spin_unlock(&obd->obd_uncommitted_replies_lock);
+        EXIT;
 }
 
 static int
@@ -1296,6 +1303,11 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
         if (!rs->rs_on_net) {
                 /* Off the net */
                 svc->srv_n_difficult_replies--;
+                if (svc->srv_n_difficult_replies == 0 && svc->srv_is_stopping)
+                        /* wake up threads that are being stopped by
+                           ptlrpc_unregister_service/ptlrpc_stop_threads
+                           and sleep waiting svr_n_difficult_replies == 0 */
+                        cfs_waitq_broadcast(&svc->srv_waitq);
                 spin_unlock(&svc->srv_lock);
 
                 class_export_put (exp);
@@ -1583,7 +1595,9 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
                                struct ptlrpc_thread *thread)
 {
         struct l_wait_info lwi = { 0 };
+        ENTRY;
 
+        CDEBUG(D_RPCTRACE, "Stopping thread %p\n", thread);
         spin_lock(&svc->srv_lock);
         thread->t_flags = SVC_STOPPING;
         spin_unlock(&svc->srv_lock);
@@ -1597,11 +1611,13 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
         spin_unlock(&svc->srv_lock);
 
         OBD_FREE_PTR(thread);
+        EXIT;
 }
 
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 {
         struct ptlrpc_thread *thread;
+        ENTRY;
 
         spin_lock(&svc->srv_lock);
         while (!list_empty(&svc->srv_threads)) {
@@ -1614,6 +1630,7 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
         }
 
         spin_unlock(&svc->srv_lock);
+        EXIT;
 }
 
 int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
@@ -1708,7 +1725,9 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         struct l_wait_info    lwi;
         struct list_head     *tmp;
         struct ptlrpc_reply_state *rs, *t;
+        ENTRY;
 
+        service->srv_is_stopping = 1;
         cfs_timer_disarm(&service->srv_at_timer);
 
         ptlrpc_stop_all_threads(service);
@@ -1838,7 +1857,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         cfs_timer_disarm(&service->srv_at_timer);
 
         OBD_FREE_PTR(service);
-        return 0;
+        RETURN(0);
 }
 
 /* Returns 0 if the service is healthy.
diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c
index 0df9d23515..26c5fb392e 100644
--- a/lustre/ptlrpc/wiretest.c
+++ b/lustre/ptlrpc/wiretest.c
@@ -237,9 +237,9 @@ void lustre_assert_wire_constants(void)
                  (long long)LCK_NL);
         LASSERTF(LCK_GROUP == 64, " found %lld\n",
                  (long long)LCK_GROUP);
-        LASSERTF(LCK_MAXMODE == 65, " found %lld\n",
+        LASSERTF(LCK_MAXMODE == 129, " found %lld\n",
                  (long long)LCK_MAXMODE);
-        LASSERTF(LCK_MODE_NUM == 7, " found %lld\n",
+        LASSERTF(LCK_MODE_NUM == 8, " found %lld\n",
                  (long long)LCK_MODE_NUM);
         CLASSERT(LDLM_PLAIN == 10);
         CLASSERT(LDLM_EXTENT == 11);
diff --git a/lustre/tests/replay-dual.sh b/lustre/tests/replay-dual.sh
index dc31f909ad..80991d5a61 100755
--- a/lustre/tests/replay-dual.sh
+++ b/lustre/tests/replay-dual.sh
@@ -403,6 +403,32 @@ test_20() { #16389
 }
 run_test 20 "recovery time is not increasing"
 
+test_21() {
+    local param_file=$TMP/$tfile-params
+
+    save_lustre_params $(facet_active_host $SINGLEMDS) "mdt.*.commit_on_sharing" > $param_file
+    do_facet $SINGLEMDS lctl set_param mdt.*.commit_on_sharing=1
+    touch  $MOUNT1/$tfile-1
+    mv  $MOUNT2/$tfile-1 $MOUNT2/$tfile-2
+    mv  $MOUNT1/$tfile-2 $MOUNT1/$tfile-3
+    replay_barrier_nosync $SINGLEMDS
+    umount $MOUNT2
+
+    facet_failover $SINGLEMDS
+
+    # all renames are replayed
+    unlink  $MOUNT1/$tfile-3 || return 2
+
+    zconf_mount `hostname` $MOUNT2 || error "mount $MOUNT2 fail"
+
+    do_facet $SINGLEMDS lctl set_param mdt.*.commit_on_sharing=0
+    rm -rf $MOUNT1/$tfile-*
+    restore_lustre_params < $param_file
+    rm -f $param_file
+    return 0
+}
+run_test 21 "commit on sharing"
+
 equals_msg `basename $0`: test complete, cleaning up
 SLEEP=$((`date +%s` - $NOW))
 [ $SLEEP -lt $TIMEOUT ] && sleep $SLEEP
diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh
index f44a51e613..422917b828 100644
--- a/lustre/tests/test-framework.sh
+++ b/lustre/tests/test-framework.sh
@@ -776,6 +776,16 @@ replay_barrier_nodf() {
     $LCTL mark "local REPLAY BARRIER on ${!svc}"
 }
 
+replay_barrier_nosync() {
+    local facet=$1    echo running=${running}
+    local svc=${facet}_svc
+    echo Replay barrier on ${!svc}
+    do_facet $facet $LCTL --device %${!svc} readonly
+    do_facet $facet $LCTL --device %${!svc} notransno
+    do_facet $facet $LCTL mark "$facet REPLAY BARRIER on ${!svc}"
+    $LCTL mark "local REPLAY BARRIER on ${!svc}"
+}
+
 mds_evict_client() {
     UUID=`lctl get_param -n mdc.${mds1_svc}-mdc-*.uuid`
     do_facet mds1 "lctl set_param -n mdt.${mds1_svc}.evict_client $UUID"
diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c
index f5a777b475..a48ed1e8a9 100644
--- a/lustre/utils/wiretest.c
+++ b/lustre/utils/wiretest.c
@@ -234,9 +234,9 @@ void lustre_assert_wire_constants(void)
                  (long long)LCK_NL);
         LASSERTF(LCK_GROUP == 64, " found %lld\n",
                  (long long)LCK_GROUP);
-        LASSERTF(LCK_MAXMODE == 65, " found %lld\n",
+        LASSERTF(LCK_MAXMODE == 129, " found %lld\n",
                  (long long)LCK_MAXMODE);
-        LASSERTF(LCK_MODE_NUM == 7, " found %lld\n",
+        LASSERTF(LCK_MODE_NUM == 8, " found %lld\n",
                  (long long)LCK_MODE_NUM);
         CLASSERT(LDLM_PLAIN == 10);
         CLASSERT(LDLM_EXTENT == 11);
-- 
GitLab