From 05d045af278ceb3e06df568f2f5c052d217e6539 Mon Sep 17 00:00:00 2001
From: yury <yury>
Date: Thu, 20 Sep 2007 07:46:47 +0000
Subject: [PATCH] b=13632 r=adilger, nikita - Do not take namespace list lock
 while doing pool shrink or recalc, so that locks cancel may be done without
 ns list lock.

---
 lustre/include/lustre_dlm.h |  6 ++++
 lustre/ldlm/ldlm_pool.c     | 70 ++++++++++++++++++++++++++-----------
 lustre/ldlm/ldlm_resource.c | 51 +++++++++++++++++++++++++--
 3 files changed, 104 insertions(+), 23 deletions(-)

diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h
index 4ee524fc15..4d1d11c7e4 100644
--- a/lustre/include/lustre_dlm.h
+++ b/lustre/include/lustre_dlm.h
@@ -654,6 +654,12 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client,
                                           ldlm_appetite_t apt);
 int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags);
 int ldlm_namespace_free(struct ldlm_namespace *ns, int force);
+void ldlm_namespace_move(struct ldlm_namespace *ns, ldlm_side_t client);
+struct ldlm_namespace *ldlm_namespace_first(ldlm_side_t client);
+void ldlm_namespace_get(struct ldlm_namespace *ns);
+void ldlm_namespace_put(struct ldlm_namespace *ns, int wakeup);
+void ldlm_namespace_get_nolock(struct ldlm_namespace *ns);
+void ldlm_namespace_put_nolock(struct ldlm_namespace *ns, int wakeup);
 int ldlm_proc_setup(void);
 #ifdef LPROCFS
 void ldlm_proc_cleanup(void);
diff --git a/lustre/ldlm/ldlm_pool.c b/lustre/ldlm/ldlm_pool.c
index 35840bfeec..eaf1fc4b4f 100644
--- a/lustre/ldlm/ldlm_pool.c
+++ b/lustre/ldlm/ldlm_pool.c
@@ -723,8 +723,8 @@ EXPORT_SYMBOL(ldlm_pools_wakeup);
 static int ldlm_pools_shrink(ldlm_side_t client, int nr, 
                              unsigned int gfp_mask)
 {
+        int total = 0, cached = 0, nr_ns;
         struct ldlm_namespace *ns;
-        int total = 0, cached = 0;
 
         if (nr != 0 && !(gfp_mask & __GFP_FS))
                 return -1;
@@ -732,28 +732,42 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr,
         CDEBUG(D_DLMTRACE, "request to shrink %d %s locks from all pools\n",
                nr, client == LDLM_NAMESPACE_CLIENT ? "client" : "server");
 
-        if (down_trylock(ldlm_namespace_lock(client)))
-                return nr != 0 ? -1 : 0;
-
+        /* Find out how many resources we may release. */
+        mutex_down(ldlm_namespace_lock(client));
         list_for_each_entry(ns, ldlm_namespace_list(client), ns_list_chain)
                 total += ldlm_pool_granted(&ns->ns_pool);
+        mutex_up(ldlm_namespace_lock(client));
 
-        if (nr == 0) {
-                mutex_up(ldlm_namespace_lock(client));
+        if (nr == 0)
                 return total;
-        }
 
-        /* Check all namespaces. */
-        list_for_each_entry(ns, ldlm_namespace_list(client), ns_list_chain) {
-                struct ldlm_pool *pl = &ns->ns_pool;
+        /* Shrink at least ldlm_namespace_nr(client) namespaces. */
+        for (nr_ns = atomic_read(ldlm_namespace_nr(client)); 
+             nr_ns > 0; nr_ns--) 
+        {
                 int cancel, nr_locks;
 
+                /* Do not call shrink under ldlm_namespace_lock(client) */
+                mutex_down(ldlm_namespace_lock(client));
+                if (list_empty(ldlm_namespace_list(client))) {
+                        mutex_up(ldlm_namespace_lock(client));
+                        /* If list is empty, we can't return any @cached > 0,
+                         * that probably would cause needless shrinker
+                         * call. */
+                        cached = 0;
+                        break;
+                }
+                ns = ldlm_namespace_first(client);
+                ldlm_namespace_get(ns);
+                ldlm_namespace_move(ns, client);
+                mutex_up(ldlm_namespace_lock(client));
+                
                 nr_locks = ldlm_pool_granted(&ns->ns_pool);
                 cancel = 1 + nr_locks * nr / total;
-                ldlm_pool_shrink(pl, cancel, gfp_mask);
+                ldlm_pool_shrink(&ns->ns_pool, cancel, gfp_mask);
                 cached += ldlm_pool_granted(&ns->ns_pool);
+                ldlm_namespace_put(ns, 1);
         }
-        mutex_up(ldlm_namespace_lock(client));
         return cached;
 }
 
@@ -771,7 +785,7 @@ void ldlm_pools_recalc(ldlm_side_t client)
 {
         __u32 nr_l = 0, nr_p = 0, l;
         struct ldlm_namespace *ns;
-        int rc, equal = 0;
+        int rc, nr, equal = 0;
 
         /* Check all modest namespaces. */
         mutex_down(ldlm_namespace_lock(client));
@@ -791,15 +805,9 @@ void ldlm_pools_recalc(ldlm_side_t client)
                         nr_l += l;
                         nr_p++;
                 }
-
-                /* After setup is done - recalc the pool. */
-                rc = ldlm_pool_recalc(&ns->ns_pool);
-                if (rc)
-                        CERROR("%s: pool recalculation error "
-                               "%d\n", ns->ns_pool.pl_name, rc);
         }
 
-        /* Make sure that modest namespaces did not eat more that 1/3 of limit */
+        /* Make sure that modest namespaces did not eat more that 2/3 of limit */
         if (nr_l >= 2 * (LDLM_POOL_HOST_L / 3)) {
                 CWARN("Modest pools eat out 2/3 of locks limit. %d of %lu. "
                       "Upgrade server!\n", nr_l, LDLM_POOL_HOST_L);
@@ -827,14 +835,34 @@ void ldlm_pools_recalc(ldlm_side_t client)
                         }
                         ldlm_pool_setup(&ns->ns_pool, l);
                 }
+        }
+        mutex_up(ldlm_namespace_lock(client));
+
+        /* Recalc at least ldlm_namespace_nr(client) namespaces. */
+        for (nr = atomic_read(ldlm_namespace_nr(client)); nr > 0; nr--) {
+                /* Lock the list, get first @ns in the list, getref, move it
+                 * to the tail, unlock and call pool recalc. This way we avoid
+                 * calling recalc under @ns lock what is really good as we get
+                 * rid of potential deadlock on client nodes when canceling
+                 * locks synchronously. */
+                mutex_down(ldlm_namespace_lock(client));
+                if (list_empty(ldlm_namespace_list(client))) {
+                        mutex_up(ldlm_namespace_lock(client));
+                        break;
+                }
+                ns = ldlm_namespace_first(client);
+                ldlm_namespace_get(ns);
+                ldlm_namespace_move(ns, client);
+                mutex_up(ldlm_namespace_lock(client));
 
                 /* After setup is done - recalc the pool. */
                 rc = ldlm_pool_recalc(&ns->ns_pool);
                 if (rc)
                         CERROR("%s: pool recalculation error "
                                "%d\n", ns->ns_pool.pl_name, rc);
+
+                ldlm_namespace_put(ns, 1);
         }
-        mutex_up(ldlm_namespace_lock(client));
 }
 EXPORT_SYMBOL(ldlm_pools_recalc);
 
diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c
index ef25d4fcfc..922dba09f9 100644
--- a/lustre/ldlm/ldlm_resource.c
+++ b/lustre/ldlm/ldlm_resource.c
@@ -557,6 +557,51 @@ int ldlm_namespace_free(struct ldlm_namespace *ns, int force)
         return ELDLM_OK;
 }
 
+void ldlm_namespace_get_nolock(struct ldlm_namespace *ns)
+{
+        LASSERT(ns->ns_refcount >= 0);
+        ns->ns_refcount++;
+}
+
+void ldlm_namespace_get(struct ldlm_namespace *ns)
+{
+        spin_lock(&ns->ns_hash_lock);
+        ldlm_namespace_get_nolock(ns);
+        spin_unlock(&ns->ns_hash_lock);
+}
+
+void ldlm_namespace_put_nolock(struct ldlm_namespace *ns, int wakeup)
+{
+        LASSERT(ns->ns_refcount > 0);
+        ns->ns_refcount--;
+        if (ns->ns_refcount == 0 && wakeup)
+                wake_up(&ns->ns_waitq);
+}
+
+void ldlm_namespace_put(struct ldlm_namespace *ns, int wakeup)
+{
+        spin_lock(&ns->ns_hash_lock);
+        ldlm_namespace_put_nolock(ns, wakeup);
+        spin_unlock(&ns->ns_hash_lock);
+}
+
+/* Should be called under ldlm_namespace_lock(client) taken */
+void ldlm_namespace_move(struct ldlm_namespace *ns, ldlm_side_t client)
+{
+        LASSERT(!list_empty(&ns->ns_list_chain));
+        LASSERT_SEM_LOCKED(ldlm_namespace_lock(client));
+        list_move_tail(&ns->ns_list_chain, ldlm_namespace_list(client));
+}
+
+/* Should be called under ldlm_namespace_lock(client) taken */
+struct ldlm_namespace *ldlm_namespace_first(ldlm_side_t client)
+{
+        LASSERT_SEM_LOCKED(ldlm_namespace_lock(client));
+        LASSERT(!list_empty(ldlm_namespace_list(client)));
+        return container_of(ldlm_namespace_list(client)->next, 
+                struct ldlm_namespace, ns_list_chain);
+}
+
 static __u32 ldlm_hash_fn(struct ldlm_resource *parent, struct ldlm_res_id name)
 {
         __u32 hash = 0;
@@ -655,7 +700,7 @@ ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent,
         bucket = ns->ns_hash + hash;
         list_add(&res->lr_hash, bucket);
         ns->ns_resources++;
-        ns->ns_refcount++;
+        ldlm_namespace_get_nolock(ns);
 
         if (parent == NULL) {
                 list_add(&res->lr_childof, &ns->ns_root_list);
@@ -752,7 +797,9 @@ void __ldlm_resource_putref_final(struct ldlm_resource *res)
                 LBUG();
         }
 
-        ns->ns_refcount--;
+        /* Pass 0 as second argument to not wake up ->ns_waitq yet, will do it
+         * later. */
+        ldlm_namespace_put_nolock(ns, 0);
         list_del_init(&res->lr_hash);
         list_del_init(&res->lr_childof);
 
-- 
GitLab