From c83c2bee2dfa881007fa697ed912dd64574a0c3e Mon Sep 17 00:00:00 2001
From: yury <yury>
Date: Fri, 7 Nov 2008 22:37:57 +0000
Subject: [PATCH] b=17511 r=johann,adilger

  - removes deadlock possibility by disabling rehash in hash_del() operations and moving hash_add()
    out of spin_locks when calling. Hash table has own mechanisms for protecting its structures and it
    also has hash_add_unique() method for using in concurrent run contexts;

    - fixed missed lh_put() in hash_add_unique() which led to extra refs in some cases (extra ref to
    export) and inability to cleanup;

    - fixed __lustre_hash_set_theta() which set @max theta into ->lh_min_theta;

    - in lustre_hash_rehash_size() disable rehash also for the case when new and old hash sizes equal
    in corner cases (max_size or min_size). Before this fix it could be possible to do needless
    rehashes when size is actually did not change but we do this expensive operation;

    - disable rehash in hash_add_unique() if no actual add happened since entry with the same key is
    already found in the table;

    - some cleanups in hash table code;
---
 lustre/include/class_hash.h  |  6 ++---
 lustre/ldlm/ldlm_flock.c     |  3 +--
 lustre/ldlm/ldlm_lib.c       | 15 +++++--------
 lustre/lov/lov_pool.c        | 12 +++++-----
 lustre/mdt/mdt_handler.c     |  3 ++-
 lustre/obdclass/class_hash.c | 43 ++++++++++++++++++------------------
 6 files changed, 37 insertions(+), 45 deletions(-)

diff --git a/lustre/include/class_hash.h b/lustre/include/class_hash.h
index 70349cb092..e99efe84c4 100644
--- a/lustre/include/class_hash.h
+++ b/lustre/include/class_hash.h
@@ -242,8 +242,8 @@ void lustre_hash_exit(lustre_hash_t *lh);
  */
 void lustre_hash_add(lustre_hash_t *lh, void *key,
                      struct hlist_node *hnode);
-int  lustre_hash_add_unique(lustre_hash_t *lh, void *key,
-                            struct hlist_node *hnode);
+int lustre_hash_add_unique(lustre_hash_t *lh, void *key,
+                           struct hlist_node *hnode);
 void *lustre_hash_findadd_unique(lustre_hash_t *lh, void *key,
                                  struct hlist_node *hnode);
 
@@ -284,7 +284,7 @@ __lustre_hash_set_theta(lustre_hash_t *lh, int min, int max)
 {
         LASSERT(min < max);
         lh->lh_min_theta = min;
-        lh->lh_min_theta = max;
+        lh->lh_max_theta = max;
 }
 
 /* 
diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c
index 2e888ffe2c..d38cfdff86 100644
--- a/lustre/ldlm/ldlm_flock.c
+++ b/lustre/ldlm/ldlm_flock.c
@@ -405,9 +405,8 @@ reprocess:
                                                 &new2->l_remote_handle,
                                                 &new2->l_exp_hash);
                 }
-                if (*flags == LDLM_FL_WAIT_NOREPROC) {
+                if (*flags == LDLM_FL_WAIT_NOREPROC)
                         ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode);
-                }
 
                 /* insert new2 at lock */
                 ldlm_resource_add_lock(res, ownlocks, new2);
diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c
index 3431f63569..f6b0a4da38 100644
--- a/lustre/ldlm/ldlm_lib.c
+++ b/lustre/ldlm/ldlm_lib.c
@@ -910,16 +910,11 @@ dont_check_exports:
         export->exp_connection = ptlrpc_connection_get(req->rq_peer,
                                                        req->rq_self,
                                                        &remote_uuid);
-
-        spin_lock(&target->obd_dev_lock);
-
-        /* Export might be hashed already, e.g. if this is reconnect */
-        if (hlist_unhashed(&export->exp_nid_hash))
-                lustre_hash_add(export->exp_obd->obd_nid_hash,
-                                &export->exp_connection->c_peer.nid,
-                                &export->exp_nid_hash);
-
-        spin_unlock(&target->obd_dev_lock);
+        if (hlist_unhashed(&export->exp_nid_hash)) {
+                lustre_hash_add_unique(export->exp_obd->obd_nid_hash,
+                                       &export->exp_connection->c_peer.nid,
+                                       &export->exp_nid_hash);
+        }
 
         spin_lock_bh(&target->obd_processing_task_lock);
         if (target->obd_recovering && !export->exp_in_recovery) {
diff --git a/lustre/lov/lov_pool.c b/lustre/lov/lov_pool.c
index 12115ebd17..f63fac745b 100644
--- a/lustre/lov/lov_pool.c
+++ b/lustre/lov/lov_pool.c
@@ -400,18 +400,16 @@ int lov_pool_new(struct obd_device *obd, char *poolname)
                 GOTO(out_err, rc);
         }
 
-        spin_lock(&obd->obd_dev_lock);
-        /* check if pool already exists */
-        if (lustre_hash_lookup(lov->lov_pools_hash_body, poolname) != NULL) {
-                spin_unlock(&obd->obd_dev_lock);
+        INIT_HLIST_NODE(&new_pool->pool_hash);
+        rc = lustre_hash_add_unique(lov->lov_pools_hash_body, poolname,
+                                    &new_pool->pool_hash);
+        if (rc) {
                 lov_ost_pool_free(&new_pool->pool_rr.lqr_pool);
                 lov_ost_pool_free(&new_pool->pool_obds);
                 GOTO(out_err, rc = -EEXIST);
         }
 
-        INIT_HLIST_NODE(&new_pool->pool_hash);
-        lustre_hash_add_unique(lov->lov_pools_hash_body, poolname,
-                               &new_pool->pool_hash);
+        spin_lock(&obd->obd_dev_lock);
         list_add_tail(&new_pool->pool_list, &lov->lov_pool_list);
         lov->lov_pool_count++;
         spin_unlock(&obd->obd_dev_lock);
diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c
index ffdc67c158..91943141d5 100644
--- a/lustre/mdt/mdt_handler.c
+++ b/lustre/mdt/mdt_handler.c
@@ -2885,11 +2885,12 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info,
         new_lock->l_remote_handle = lock->l_remote_handle;
         new_lock->l_flags &= ~LDLM_FL_LOCAL;
 
+        unlock_res_and_lock(new_lock);
+
         lustre_hash_add(new_lock->l_export->exp_lock_hash,
                         &new_lock->l_remote_handle,
                         &new_lock->l_exp_hash);
 
-        unlock_res_and_lock(new_lock);
         LDLM_LOCK_RELEASE(new_lock);
         lh->mlh_reg_lh.cookie = 0;
 
diff --git a/lustre/obdclass/class_hash.c b/lustre/obdclass/class_hash.c
index 0dd18f137d..ee90aaf3cb 100644
--- a/lustre/obdclass/class_hash.c
+++ b/lustre/obdclass/class_hash.c
@@ -95,11 +95,12 @@ lustre_hash_init(char *name, unsigned int cur_size, unsigned int max_size,
         lh->lh_cur_size = cur_size;
         lh->lh_min_size = cur_size;
         lh->lh_max_size = max_size;
-        lh->lh_min_theta = 500;  /* theta * 1000 */
-        lh->lh_max_theta = 2000; /* theta * 1000 */
         lh->lh_ops = ops;
         lh->lh_flags = flags;
 
+        /* theta * 1000 */
+        __lustre_hash_set_theta(lh, 500, 2000);
+
         OBD_VMALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size);
         if (!lh->lh_buckets) {
                 OBD_FREE_PTR(lh);
@@ -156,18 +157,24 @@ EXPORT_SYMBOL(lustre_hash_exit);
 
 static inline unsigned int lustre_hash_rehash_size(lustre_hash_t *lh)
 {
+        int size;
+
         if (!(lh->lh_flags & LH_REHASH))
                 return 0;
 
         if ((lh->lh_cur_size < lh->lh_max_size) &&
             (__lustre_hash_theta(lh) > lh->lh_max_theta))
-                return MIN(lh->lh_cur_size * 2, lh->lh_max_size);
-
-        if ((lh->lh_cur_size > lh->lh_min_size) &&
+                size = min(lh->lh_cur_size * 2, lh->lh_max_size);
+        else if ((lh->lh_cur_size > lh->lh_min_size) &&
             (__lustre_hash_theta(lh) < lh->lh_min_theta))
-                return MAX(lh->lh_cur_size / 2, lh->lh_min_size);
+                size = max(lh->lh_cur_size / 2, lh->lh_min_size);
+        else
+                size = 0;
 
-        return 0;
+        if (lh->lh_cur_size == size)
+                size = 0;
+
+        return size;
 }
 
 /**
@@ -207,9 +214,9 @@ static struct hlist_node *
 lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
                                  struct hlist_node *hnode)
 {
+        int                   size = 0;
         struct hlist_node    *ehnode;
         lustre_hash_bucket_t *lhb;
-        int                   size;
         unsigned              i;
         ENTRY;
 
@@ -228,10 +235,9 @@ lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
         } else {
                 __lustre_hash_bucket_add(lh, lhb, hnode);
                 ehnode = hnode;
+                size = lustre_hash_rehash_size(lh);
         }
         write_unlock(&lhb->lhb_rwlock);
-
-        size = lustre_hash_rehash_size(lh);
         read_unlock(&lh->lh_rwlock);
         if (size)
                 lustre_hash_rehash(lh, size);
@@ -251,8 +257,10 @@ lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
         ENTRY;
 
         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
-        if (ehnode != hnode)
+        if (ehnode != hnode) {
+                lh_put(lh, ehnode);
                 RETURN(-EALREADY);
+        }
 
         RETURN(0);
 }
@@ -290,7 +298,6 @@ void *
 lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
 {
         lustre_hash_bucket_t *lhb;
-        int                   size;
         unsigned              i;
         void                 *obj;
         ENTRY;
@@ -306,11 +313,7 @@ lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
         write_lock(&lhb->lhb_rwlock);
         obj = __lustre_hash_bucket_del(lh, lhb, hnode);
         write_unlock(&lhb->lhb_rwlock);
-
-        size = lustre_hash_rehash_size(lh);
         read_unlock(&lh->lh_rwlock);
-        if (size)
-                lustre_hash_rehash(lh, size);
 
         RETURN(obj);
 }
@@ -327,7 +330,6 @@ lustre_hash_del_key(lustre_hash_t *lh, void *key)
 {
         struct hlist_node    *hnode;
         lustre_hash_bucket_t *lhb;
-        int                   size;
         unsigned              i;
         void                 *obj = NULL;
         ENTRY;
@@ -343,11 +345,7 @@ lustre_hash_del_key(lustre_hash_t *lh, void *key)
                 obj = __lustre_hash_bucket_del(lh, lhb, hnode);
 
         write_unlock(&lhb->lhb_rwlock);
-
-        size = lustre_hash_rehash_size(lh);
         read_unlock(&lh->lh_rwlock);
-        if (size)
-                lustre_hash_rehash(lh, size);
 
         RETURN(obj);
 }
@@ -499,7 +497,7 @@ restart:
 }
 EXPORT_SYMBOL(lustre_hash_for_each_empty);
 
-  /*
+/**
  * For each item in the lustre hash @lh which matches the @key call
  * the passed callback @func and pass to it as an argument each hash
  * item and the private @data.  Before each callback ops->lh_get will
@@ -566,6 +564,7 @@ lustre_hash_rehash(lustre_hash_t *lh, int size)
         ENTRY;
 
         LASSERT(size > 0);
+        LASSERT(!in_interrupt());
 
         OBD_VMALLOC(rehash_buckets, sizeof(*rehash_buckets) * size);
         if (!rehash_buckets)
-- 
GitLab