From ba0bdbd09ee5f2c43fc9725023f022c33b1c3a92 Mon Sep 17 00:00:00 2001
From: deen <deen>
Date: Wed, 18 Jul 2007 23:36:50 +0000
Subject: [PATCH] Instead of sending blocking and completion callbacks as
 separated requests, adding them to a set and sending in parallel.

b=11301
i=green
i=wangdi
---
 lustre/ChangeLog            |   6 +++
 lustre/ldlm/ldlm_internal.h |  11 ++++
 lustre/ldlm/ldlm_lock.c     |  97 +++++++++++++++++++++++++++-------
 lustre/ldlm/ldlm_lockd.c    | 101 +++++++++++++++++++++++++-----------
 4 files changed, 166 insertions(+), 49 deletions(-)

diff --git a/lustre/ChangeLog b/lustre/ChangeLog
index f883e7c6b6..94b4f78510 100644
--- a/lustre/ChangeLog
+++ b/lustre/ChangeLog
@@ -509,6 +509,12 @@ Details    : the definition for OBD_IOC_GETNAME changed in 1.6.0.  One of the
 	     Add the old ioctl number to the handler so both old and new
 	     lfsck can work.
 
+Severity   : normal
+Bugzilla   : 11301
+Description: parallel lock callbacks
+Details    : Instead of sending blocking and completion callbacks as separated
+             requests, adding them to a set and sending in parallel.
+
 --------------------------------------------------------------------------------
 
 2007-05-03  Cluster File Systems, Inc. <info@clusterfs.com>
diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h
index a11a85a8b7..fe460c0f74 100644
--- a/lustre/ldlm/ldlm_internal.h
+++ b/lustre/ldlm/ldlm_internal.h
@@ -21,6 +21,17 @@ void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
                                      struct ldlm_lock *new);
 
 /* ldlm_lock.c */
+
+/* Number of blocking/completion callbacks that will be sent in
+ * parallel (see bug 11301). */
+#define PARALLEL_AST_LIMIT      200
+
+struct ldlm_cb_set_arg {
+        struct ptlrpc_request_set *set;
+        atomic_t restart;
+        __u16 type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */
+};
+
 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list);
 struct ldlm_lock *
 ldlm_lock_create(struct ldlm_namespace *ns, struct ldlm_res_id,
diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c
index e95d2c3f4d..b496dba02e 100644
--- a/lustre/ldlm/ldlm_lock.c
+++ b/lustre/ldlm/ldlm_lock.c
@@ -1222,13 +1222,38 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
         RETURN(rc);
 }
 
+/* Helper function for pair ldlm_run_{bl,cp}_ast_work().
+ * 
+ * Send an existing rpc set specified by @arg->set and then
+ * destroy it. Create new one if @do_create flag is set. */
+static void
+ldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create)
+{
+        int rc;
+
+        rc = ptlrpc_set_wait(arg->set);
+        if (arg->type == LDLM_BL_CALLBACK)
+                OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
+        ptlrpc_set_destroy(arg->set);
+
+        if (do_create)
+                arg->set = ptlrpc_prep_set();
+}
+
 int ldlm_run_bl_ast_work(struct list_head *rpc_list)
 {
+        struct ldlm_cb_set_arg arg;
         struct list_head *tmp, *pos;
         struct ldlm_lock_desc d;
-        int rc = 0, retval = 0;
+        int ast_count;
+        int rc = 0;
         ENTRY;
 
+        arg.set = ptlrpc_prep_set();
+        atomic_set(&arg.restart, 0);
+        arg.type = LDLM_BL_CALLBACK;
+
+        ast_count = 0;
         list_for_each_safe(tmp, pos, rpc_list) {
                 struct ldlm_lock *lock =
                         list_entry(tmp, struct ldlm_lock, l_bl_ast);
@@ -1247,24 +1272,44 @@ int ldlm_run_bl_ast_work(struct list_head *rpc_list)
 
                 LDLM_LOCK_PUT(lock->l_blocking_lock);
                 lock->l_blocking_lock = NULL;
-                rc = lock->l_blocking_ast(lock, &d, NULL, LDLM_CB_BLOCKING);
-
-                if (rc == -ERESTART)
-                        retval = rc;
-                else if (rc)
-                        CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
-                               "disconnect client\n");
+                rc = lock->l_blocking_ast(lock, &d, (void *)&arg, 
+                                          LDLM_CB_BLOCKING);
                 LDLM_LOCK_PUT(lock);
+                ast_count++;
+
+                /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
+                 * and create a new set for requests that remained in
+                 * @rpc_list */
+                if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {
+                        ldlm_send_and_maybe_create_set(&arg, 1);
+                        ast_count = 0;
+                }
         }
-        RETURN(retval);
+
+        if (ast_count > 0)
+                ldlm_send_and_maybe_create_set(&arg, 0);
+        else
+                /* In case when number of ASTs is multiply of
+                 * PARALLEL_AST_LIMIT or @rpc_list was initially empty,
+                 * @arg.set must be destroyed here, otherwise we get 
+                 * write memory leaking. */
+                ptlrpc_set_destroy(arg.set);
+
+        RETURN(atomic_read(&arg.restart) ? -ERESTART : 0);
 }
 
 int ldlm_run_cp_ast_work(struct list_head *rpc_list)
 {
+        struct ldlm_cb_set_arg arg;
         struct list_head *tmp, *pos;
-        int rc = 0, retval = 0;
+        int ast_count;
+        int rc = 0;
         ENTRY;
 
+        arg.set = ptlrpc_prep_set();
+        atomic_set(&arg.restart, 0);
+        arg.type = LDLM_CP_CALLBACK;
+
         /* It's possible to receive a completion AST before we've set
          * the l_completion_ast pointer: either because the AST arrived
          * before the reply, or simply because there's a small race
@@ -1276,6 +1321,7 @@ int ldlm_run_cp_ast_work(struct list_head *rpc_list)
          * reader/writer reference, which we won't do until we get the
          * reply and finish enqueueing. */
         
+        ast_count = 0;
         list_for_each_safe(tmp, pos, rpc_list) {
                 struct ldlm_lock *lock =
                         list_entry(tmp, struct ldlm_lock, l_cp_ast);
@@ -1287,16 +1333,31 @@ int ldlm_run_cp_ast_work(struct list_head *rpc_list)
                 lock->l_flags &= ~LDLM_FL_CP_REQD;
                 unlock_res_and_lock(lock);
 
-                if (lock->l_completion_ast != NULL)
-                        rc = lock->l_completion_ast(lock, 0, 0);
-                if (rc == -ERESTART)
-                        retval = rc;
-                else if (rc)
-                        CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
-                               "disconnect client\n");
+                if (lock->l_completion_ast != NULL) {
+                        rc = lock->l_completion_ast(lock, 0, (void *)&arg);
+                        ast_count++;
+                }
                 LDLM_LOCK_PUT(lock);
+
+                /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
+                 * and create a new set for requests that remained in
+                 * @rpc_list */
+                if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {
+                        ldlm_send_and_maybe_create_set(&arg, 1);
+                        ast_count = 0;
+                }
         }
-        RETURN(retval);
+
+        if (ast_count > 0)
+                ldlm_send_and_maybe_create_set(&arg, 0);
+        else
+                /* In case when number of ASTs is multiply of
+                 * PARALLEL_AST_LIMIT or @rpc_list was initially empty,
+                 * @arg.set must be destroyed here, otherwise we get 
+                 * write memory leaking. */
+                ptlrpc_set_destroy(arg.set);
+
+        RETURN(atomic_read(&arg.restart) ? -ERESTART : 0);
 }
 
 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c
index f4d5d9b119..ed48f82959 100644
--- a/lustre/ldlm/ldlm_lockd.c
+++ b/lustre/ldlm/ldlm_lockd.c
@@ -479,15 +479,70 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock,
         return rc;
 }
 
+static int ldlm_cb_interpret(struct ptlrpc_request *req, void *data, int rc)
+{
+        struct ldlm_cb_set_arg *arg;
+        struct ldlm_lock *lock;
+        ENTRY;
+
+        LASSERT(data != NULL);
+
+        arg = req->rq_async_args.pointer_arg[0];
+        lock = req->rq_async_args.pointer_arg[1];
+        LASSERT(lock != NULL);
+        if (rc != 0) {
+                /* If client canceled the lock but the cancel has not 
+                 * been recieved yet, we need to update lvbo to have the
+                 * proper attributes cached. */
+                if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
+                        ldlm_res_lvbo_update(lock->l_resource, NULL, 
+                                             0, 1);
+                rc = ldlm_handle_ast_error(lock, req, rc, 
+                                           arg->type == LDLM_BL_CALLBACK
+                                           ? "blocking" : "completion");
+        }                
+
+        LDLM_LOCK_PUT(lock);
+
+        if (rc == -ERESTART)
+                atomic_set(&arg->restart, 1);
+
+        RETURN(0);
+}
+
+static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
+                                          struct ldlm_cb_set_arg *arg,
+                                          struct ldlm_lock *lock,
+                                          int instant_cancel)
+{
+        int rc = 0;
+        ENTRY;
+
+        if (unlikely(instant_cancel)) {
+                rc = ptl_send_rpc(req, 1);
+                ptlrpc_req_finished(req);
+                if (rc == 0)
+                        /* If we cancelled the lock, we need to restart
+                         * ldlm_reprocess_queue */
+                        atomic_set(&arg->restart, 1);
+        } else {
+                LDLM_LOCK_GET(lock);
+                ptlrpc_set_add_req(arg->set, req);
+        }       
+
+        RETURN(rc);
+}
+
 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                              struct ldlm_lock_desc *desc,
                              void *data, int flag)
 {
+        struct ldlm_cb_set_arg *arg = (struct ldlm_cb_set_arg *)data;
         struct ldlm_request *body;
         struct ptlrpc_request *req;
         int size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };
-        int instant_cancel = 0, rc = 0;
+        int instant_cancel = 0, rc;
         ENTRY;
 
         if (flag == LDLM_CB_CANCELING) {
@@ -496,12 +551,17 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         }
 
         LASSERT(lock);
+        LASSERT(data != NULL);
 
         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
                               LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK, 2, size,
                               NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
+ 
+        req->rq_async_args.pointer_arg[0] = arg;
+        req->rq_async_args.pointer_arg[1] = lock;
+        req->rq_interpret_reply = ldlm_cb_interpret;
 
         lock_res(lock->l_resource);
         if (lock->l_granted_mode != lock->l_req_mode) {
@@ -555,42 +615,25 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
 
-        if (unlikely(instant_cancel)) {
-                rc = ptl_send_rpc(req, 1);
-        } else {
-                rc = ptlrpc_queue_wait(req);
-                OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
-        }
-        if (rc != 0) {
-                /* If client canceled the lock but the cancel has not been
-                 * recieved yet, we need to update lvbo to have the proper
-                 * attributes cached. */
-                if (rc == -EINVAL)
-                        ldlm_res_lvbo_update(lock->l_resource, NULL, 0, 1);
-                rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
-        }
-        
-        ptlrpc_req_finished(req);
-
-        /* If we cancelled the lock, we need to restart ldlm_reprocess_queue */
-        if (!rc && instant_cancel)
-                rc = -ERESTART;
+        rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
 
         RETURN(rc);
 }
 
 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
+        struct ldlm_cb_set_arg *arg = (struct ldlm_cb_set_arg *)data;
         struct ldlm_request *body;
         struct ptlrpc_request *req;
         struct timeval granted_time;
         long total_enqueue_wait;
         int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                         [DLM_LOCKREQ_OFF]     = sizeof(*body) };
-        int rc = 0, buffers = 2, instant_cancel = 0;
+        int rc, buffers = 2, instant_cancel = 0;
         ENTRY;
 
         LASSERT(lock != NULL);
+        LASSERT(data != NULL);
 
         do_gettimeofday(&granted_time);
         total_enqueue_wait = cfs_timeval_sub(&granted_time,
@@ -613,6 +656,10 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (req == NULL)
                 RETURN(-ENOMEM);
 
+        req->rq_async_args.pointer_arg[0] = arg;
+        req->rq_async_args.pointer_arg[1] = lock;
+        req->rq_interpret_reply = ldlm_cb_interpret;       
+
         body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
         body->lock_handle[0] = lock->l_remote_handle;
         body->lock_flags = flags;
@@ -663,16 +710,8 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (lock->l_export && lock->l_export->exp_ldlm_stats)
                 lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
-        
-        rc = ptlrpc_queue_wait(req);
-        if (rc != 0)
-                rc = ldlm_handle_ast_error(lock, req, rc, "completion");
 
-        ptlrpc_req_finished(req);
-
-        /* If we cancelled the lock, we need to restart ldlm_reprocess_queue */
-        if (!rc && instant_cancel)
-                rc = -ERESTART;
+        rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
 
         RETURN(rc);
 }
-- 
GitLab