From 160acfb6eae1a7d75f6020e64f68386fff830946 Mon Sep 17 00:00:00 2001
From: vitaly <vitaly>
Date: Fri, 14 Sep 2007 17:05:34 +0000
Subject: [PATCH] Branch b1_6 b=13563 i=adilger i=green

1) cancel lru locks in ldlm_cli_cancel() and send 1 batched cancel RPC;
2) pass the whole list of canceled locks to the async blocking thread, not
lock-by-lock, and send cancels in batched cancel RPCs.
---
 lustre/ldlm/ldlm_internal.h |  7 +++-
 lustre/ldlm/ldlm_lock.c     |  2 +-
 lustre/ldlm/ldlm_lockd.c    | 62 ++++++++++++++++++---------
 lustre/ldlm/ldlm_request.c  | 83 ++++++++++++++++++++-----------------
 4 files changed, 94 insertions(+), 60 deletions(-)

diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h
index 440a21e857..2c5335c5e0 100644
--- a/lustre/ldlm/ldlm_internal.h
+++ b/lustre/ldlm/ldlm_internal.h
@@ -87,8 +87,11 @@ void ldlm_lock_touch_in_lru(struct ldlm_lock *lock);
 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock);
 
 /* ldlm_lockd.c */
-int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
-                      struct ldlm_lock *lock, int flags);
+int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+                           struct ldlm_lock *lock);
+int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+                           struct list_head *cancels, int count);
+
 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock);
 
diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c
index 4e62e68e9a..d885fd4b76 100644
--- a/lustre/ldlm/ldlm_lock.c
+++ b/lustre/ldlm/ldlm_lock.c
@@ -630,7 +630,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
                 ldlm_lock_remove_from_lru(lock);
                 unlock_res_and_lock(lock);
                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
-                    ldlm_bl_to_thread(ns, NULL, lock, 0) != 0)
+                    ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
                         ldlm_handle_bl_callback(ns, NULL, lock);
         } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
                    !lock->l_readers && !lock->l_writers &&
diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c
index ca5f7f8b0c..f55e74d59f 100644
--- a/lustre/ldlm/ldlm_lockd.c
+++ b/lustre/ldlm/ldlm_lockd.c
@@ -94,7 +94,8 @@ struct ldlm_bl_work_item {
         struct ldlm_namespace   *blwi_ns;
         struct ldlm_lock_desc   blwi_ld;
         struct ldlm_lock        *blwi_lock;
-        int                     blwi_flags;
+        struct list_head        blwi_head;
+        int                     blwi_count;
 };
 
 #ifdef __KERNEL__
@@ -1335,7 +1336,7 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
                            cfs_time_add(lock->l_last_used, 
                                         cfs_time_seconds(10)))) {
                 unlock_res_and_lock(lock);
-                if (ldlm_bl_to_thread(ns, NULL, lock, 0))
+                if (ldlm_bl_to_thread_lock(ns, NULL, lock))
                         ldlm_handle_bl_callback(ns, NULL, lock);
 
                 EXIT;
@@ -1357,14 +1358,18 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
         return ptlrpc_reply(req);
 }
 
-int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
-                      struct ldlm_lock *lock, int flags)
-{
 #ifdef __KERNEL__
+static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
+                             struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
+                             struct list_head *cancels, int count)
+{
         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
         struct ldlm_bl_work_item *blwi;
         ENTRY;
 
+        if (cancels && count == 0)
+                RETURN(0);
+
         OBD_ALLOC(blwi, sizeof(*blwi));
         if (blwi == NULL)
                 RETURN(-ENOMEM);
@@ -1372,15 +1377,37 @@ int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
         blwi->blwi_ns = ns;
         if (ld != NULL)
                 blwi->blwi_ld = *ld;
-        blwi->blwi_lock = lock;
-        blwi->blwi_flags = flags;
-
+        if (count) {
+                list_add(&blwi->blwi_head, cancels);
+                list_del_init(cancels);
+                blwi->blwi_count = count;
+        } else {
+                blwi->blwi_lock = lock;
+        }
         spin_lock(&blp->blp_lock);
         list_add_tail(&blwi->blwi_entry, &blp->blp_list);
         cfs_waitq_signal(&blp->blp_waitq);
         spin_unlock(&blp->blp_lock);
 
         RETURN(0);
+}
+#endif
+
+int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+                           struct ldlm_lock *lock)
+{
+#ifdef __KERNEL__
+        RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0));
+#else
+        RETURN(-ENOSYS);
+#endif
+}
+
+int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+                           struct list_head *cancels, int count)
+{
+#ifdef __KERNEL__
+        RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count));
 #else
         RETURN(-ENOSYS);
 #endif
@@ -1529,7 +1556,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 CDEBUG(D_INODE, "blocking ast\n");
                 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
                         ldlm_callback_reply(req, 0);
-                if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock, 0))
+                if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
                 break;
         case LDLM_CP_CALLBACK:
@@ -1649,18 +1676,13 @@ static int ldlm_bl_thread_main(void *arg)
                 if (blwi->blwi_ns == NULL)
                         break;
 
-                if (blwi->blwi_flags == LDLM_FL_CANCELING) {
+                if (blwi->blwi_count) {
                         /* The special case when we cancel locks in lru
-                         * asynchronously, then we first remove the lock from
-                         * l_bl_ast explicitely in ldlm_cancel_lru before
-                         * sending it to this thread. Thus lock is marked
-                         * LDLM_FL_CANCELING, and already cancelled locally. */
-                        CFS_LIST_HEAD(head);
-                        LASSERT(list_empty(&blwi->blwi_lock->l_bl_ast));
-                        list_add(&blwi->blwi_lock->l_bl_ast, &head);
-                        ldlm_cli_cancel_req(blwi->blwi_lock->l_conn_export,
-                                            &head, 1);
-                        LDLM_LOCK_PUT(blwi->blwi_lock);
+                         * asynchronously, we pass the list of locks here.
+                         * Thus lock is marked LDLM_FL_CANCELING, and already
+                         * canceled locally. */
+                        ldlm_cli_cancel_list(&blwi->blwi_head,
+                                             blwi->blwi_count, NULL, 0);
                 } else {
                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
                                                 blwi->blwi_lock);
diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c
index 651dfd1893..20704285ae 100644
--- a/lustre/ldlm/ldlm_request.c
+++ b/lustre/ldlm/ldlm_request.c
@@ -469,18 +469,28 @@ cleanup:
 /* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
  * a single page on the send/receive side. XXX: 512 should be changed
  * to more adequate value. */
-#define ldlm_req_handles_avail(exp, size, bufcount, off)                \
-({                                                                      \
-        int _avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);      \
-        int _s = size[DLM_LOCKREQ_OFF];                                 \
-        size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request);            \
-        _avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, \
-                                  bufcount, size);                      \
-        _avail /= sizeof(struct lustre_handle);                         \
-        _avail += LDLM_LOCKREQ_HANDLES - off;                           \
-        size[DLM_LOCKREQ_OFF] = _s;                                     \
-        _avail;                                                         \
-})
+static inline int ldlm_req_handles_avail(struct obd_export *exp,
+                                         int *size, int bufcount, int off)
+{
+        int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
+        int old_size = size[DLM_LOCKREQ_OFF];
+
+        size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request);
+        avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
+                                 bufcount, size);
+        avail /= sizeof(struct lustre_handle);
+        avail += LDLM_LOCKREQ_HANDLES - off;
+        size[DLM_LOCKREQ_OFF] = old_size;
+
+        return avail;
+}
+
+static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
+{
+        int size[2] = { sizeof(struct ptlrpc_body),
+                        sizeof(struct ldlm_request) };
+        return ldlm_req_handles_avail(exp, size, 2, 0);
+}
 
 /* Cancel lru locks and pack them into the enqueue request. Pack there the given
  * @count locks in @cancels. */
@@ -501,15 +511,14 @@ struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
                 /* Estimate the amount of free space in the request. */
                 int avail = ldlm_req_handles_avail(exp, size, bufcount,
                                                    LDLM_ENQUEUE_CANCEL_OFF);
+                int to_free = exp_connect_lru_resize(exp) ? 0 : 1;
                 
                 LASSERT(avail >= count);
-
                 /* Cancel lru locks here _only_ if the server supports 
                  * EARLY_CANCEL. Otherwise we have to send extra CANCEL
                  * rpc right on enqueue, what will make it slower, vs. 
                  * asynchronous rpc in blocking thread. */
-                count += ldlm_cancel_lru_local(ns, cancels, 
-                                               exp_connect_lru_resize(exp) ? 0 : 1,
+                count += ldlm_cancel_lru_local(ns, cancels, to_free,
                                                avail - count, LDLM_CANCEL_AGED);
                 size[DLM_LOCKREQ_OFF] =
                         ldlm_request_bufsize(count, LDLM_ENQUEUE);
@@ -904,6 +913,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp,
                                "out of sync -- not fatal\n",
                                libcfs_nid2str(req->rq_import->
                                               imp_connection->c_peer.nid));
+                        rc = 0;
                 } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
                            req->rq_import_generation == imp->imp_generation) {
                         ptlrpc_req_finished(req);
@@ -982,15 +992,22 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
         }
         
         rc = ldlm_cli_cancel_local(lock);
-        if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY)
-                GOTO(out, rc);
-
         list_add(&lock->l_bl_ast, &cancels);
-        rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1);
-        EXIT;
- out:
-        LDLM_LOCK_PUT(lock);
-        return rc < 0 ? rc : 0;
+
+        if (rc == LDLM_FL_BL_AST) {
+                rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1);
+        } else if (rc == LDLM_FL_CANCELING) {
+                int avail = ldlm_cancel_handles_avail(lock->l_conn_export);
+                int count = 1;
+                LASSERT(avail > 0);
+                count += ldlm_cancel_lru_local(lock->l_resource->lr_namespace,
+                                               &cancels, 0, avail - 1,
+                                               LDLM_CANCEL_AGED);
+                ldlm_cli_cancel_list(&cancels, count, NULL, 0);
+        }
+        if (rc != LDLM_FL_CANCELING)
+                LDLM_LOCK_PUT(lock);
+        RETURN(rc < 0 ? rc : 0);
 }
 
 /* - Free space in lru for @count new locks,
@@ -1159,21 +1176,13 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync)
 #endif
         count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0);
         if (sync == LDLM_ASYNC) {
-                struct ldlm_lock *lock, *next;
-                list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) {
-                        /* Remove from the list to allow blocking thread to
-                         * re-use l_bl_ast. */
-                        list_del_init(&lock->l_bl_ast);
-                        rc = ldlm_bl_to_thread(ns, NULL, lock,
-                                               LDLM_FL_CANCELING);
-                        if (rc)
-                                list_add_tail(&lock->l_bl_ast, &next->l_bl_ast);
-                }
+                rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
+                if (rc == 0)
+                        RETURN(count);
         }
-
-        /* If some locks are left in the list in ASYNC mode, or
+        /* If an error occured in ASYNC mode, or
          * this is SYNC mode, cancel the list. */
-        ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF);
+        ldlm_cli_cancel_list(&cancels, count, NULL, 0);
         RETURN(count);
 }
 
@@ -1329,7 +1338,7 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
 
         count = ldlm_cancel_resource_local(res, &cancels, NULL, LCK_MINMODE,
                                            0, flags, opaque);
-        rc = ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF);
+        rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0);
         if (rc != ELDLM_OK)
                 CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
 
-- 
GitLab