From d91155fec2f4e319bb17a632d9754e7d6ed1b5b2 Mon Sep 17 00:00:00 2001
From: yury <yury>
Date: Fri, 7 Nov 2008 20:39:21 +0000
Subject: [PATCH] b=17511 r=adilger,johann

- removes deadlock possibility by disabling rehash in hash_del() operations and moving hash_add()
out of spin_locks when calling. Hash table has own mechanisms for protecting its structures and it
also has hash_add_unique() method for using in concurrent run contexts;

- fixed missed lh_put() in hash_add_unique() which led to extra refs in some cases (extra ref to
export) and inability to cleanup;

- fixed __lustre_hash_set_theta() which set @max theta into ->lh_min_theta;

- in lustre_hash_rehash_size() disable rehash also for the case when new and old hash sizes equal
in corner cases (max_size or min_size). Before this fix it could be possible to do needless
rehashes when size is actually did not change but we do this expensive operation;

- disable rehash in hash_add_unique() if no actual add happened since entry with the same key is
already found in the table;

- some cleanups in hash table code;
---
 lustre/include/class_hash.h  |  6 ++---
 lustre/ldlm/ldlm_flock.c     |  3 +--
 lustre/ldlm/ldlm_lib.c       | 13 ++++------
 lustre/ldlm/ldlm_lock.c      |  3 ++-
 lustre/ldlm/ldlm_lockd.c     |  9 +++----
 lustre/mds/handler.c         |  6 ++---
 lustre/obdclass/class_hash.c | 46 +++++++++++++++++-------------------
 7 files changed, 41 insertions(+), 45 deletions(-)

diff --git a/lustre/include/class_hash.h b/lustre/include/class_hash.h
index b153cf1b92..78a946ecd0 100644
--- a/lustre/include/class_hash.h
+++ b/lustre/include/class_hash.h
@@ -233,8 +233,8 @@ void lustre_hash_exit(lustre_hash_t *lh);
 /* Hash addition functions */
 void lustre_hash_add(lustre_hash_t *lh, void *key,
                      struct hlist_node *hnode);
-int  lustre_hash_add_unique(lustre_hash_t *lh, void *key,
-                            struct hlist_node *hnode);
+int lustre_hash_add_unique(lustre_hash_t *lh, void *key,
+                           struct hlist_node *hnode);
 void *lustre_hash_findadd_unique(lustre_hash_t *lh, void *key,
                                  struct hlist_node *hnode);
 
@@ -269,7 +269,7 @@ __lustre_hash_set_theta(lustre_hash_t *lh, int min, int max)
 {
         LASSERT(min < max);
         lh->lh_min_theta = min;
-        lh->lh_min_theta = max;
+        lh->lh_max_theta = max;
 }
 
 /* Generic debug formatting routines mainly for proc handler */
diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c
index 08cc7142f9..c77b40ca98 100644
--- a/lustre/ldlm/ldlm_flock.c
+++ b/lustre/ldlm/ldlm_flock.c
@@ -397,9 +397,8 @@ reprocess:
                                                 &new2->l_remote_handle,
                                                 &new2->l_exp_hash);
                 }
-                if (*flags == LDLM_FL_WAIT_NOREPROC) {
+                if (*flags == LDLM_FL_WAIT_NOREPROC)
                         ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode);
-                }
 
                 /* insert new2 at lock */
                 ldlm_resource_add_lock(res, ownlocks, new2);
diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c
index 4417b6792b..23f0aba9c3 100644
--- a/lustre/ldlm/ldlm_lib.c
+++ b/lustre/ldlm/ldlm_lib.c
@@ -896,14 +896,11 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                                                        req->rq_self,
                                                        &remote_uuid);
 
-        spin_lock(&target->obd_dev_lock);
-        /* Export might be hashed already, e.g. if this is reconnect */
-        if (hlist_unhashed(&export->exp_nid_hash))
-                lustre_hash_add(export->exp_obd->obd_nid_hash,
-                                &export->exp_connection->c_peer.nid,
-                                &export->exp_nid_hash);
-
-        spin_unlock(&target->obd_dev_lock);
+        if (hlist_unhashed(&export->exp_nid_hash)) {
+                lustre_hash_add_unique(export->exp_obd->obd_nid_hash,
+                                       &export->exp_connection->c_peer.nid,
+                                       &export->exp_nid_hash);
+        }
 
         if (lustre_msg_get_op_flags(req->rq_repmsg) & MSG_CONNECT_RECONNECT) {
                 revimp = class_import_get(export->exp_imp_reverse);
diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c
index def9fc8885..32e48ced60 100644
--- a/lustre/ldlm/ldlm_lock.c
+++ b/lustre/ldlm/ldlm_lock.c
@@ -265,7 +265,8 @@ int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
         }
         lock->l_destroyed = 1;
 
-        if (lock->l_export && lock->l_export->exp_lock_hash)
+        if (lock->l_export && lock->l_export->exp_lock_hash && 
+            !hlist_unhashed(&lock->l_exp_hash))
                 lustre_hash_del(lock->l_export->exp_lock_hash,
                                 &lock->l_remote_handle, &lock->l_exp_hash);
 
diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c
index 41273ffd06..8bf6dc9e30 100644
--- a/lustre/ldlm/ldlm_lockd.c
+++ b/lustre/ldlm/ldlm_lockd.c
@@ -1022,10 +1022,10 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
         }
         lock->l_export = class_export_get(req->rq_export);
 
-        if (lock->l_export->exp_lock_hash)
+        if (lock->l_export->exp_lock_hash) {
                 lustre_hash_add(lock->l_export->exp_lock_hash,
-                                &lock->l_remote_handle, 
-                                &lock->l_exp_hash);
+                                &lock->l_remote_handle, &lock->l_exp_hash);
+        }
 
 existing_lock:
 
@@ -1056,7 +1056,8 @@ existing_lock:
         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
                 lock->l_req_extent = lock->l_policy_data.l_extent;
 
-        err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, (int *)&flags);
+        err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, 
+                                (int *)&flags);
         if (err)
                 GOTO(out, err);
 
diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c
index a668c5f5ac..ee4fd7c2cd 100644
--- a/lustre/mds/handler.c
+++ b/lustre/mds/handler.c
@@ -2640,10 +2640,10 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
                sizeof(lock->l_remote_handle));
 
-        lustre_hash_add(new_lock->l_export->exp_lock_hash,
-                        &new_lock->l_remote_handle, &new_lock->l_exp_hash);
-
         unlock_res_and_lock(new_lock);
+        lustre_hash_add(new_lock->l_export->exp_lock_hash,
+                        &new_lock->l_remote_handle, 
+                        &new_lock->l_exp_hash);
         LDLM_LOCK_PUT(new_lock);
 
         RETURN(ELDLM_LOCK_REPLACED);
diff --git a/lustre/obdclass/class_hash.c b/lustre/obdclass/class_hash.c
index 4138cc097a..5e8fab23e9 100644
--- a/lustre/obdclass/class_hash.c
+++ b/lustre/obdclass/class_hash.c
@@ -95,11 +95,12 @@ lustre_hash_init(char *name, unsigned int cur_size, unsigned int max_size,
         lh->lh_cur_size = cur_size;
         lh->lh_min_size = cur_size;
         lh->lh_max_size = max_size;
-        lh->lh_min_theta = 500;  /* theta * 1000 */
-        lh->lh_max_theta = 2000; /* theta * 1000 */
         lh->lh_ops = ops;
         lh->lh_flags = flags;
 
+        /* theta * 1000 */
+        __lustre_hash_set_theta(lh, 500, 2000);
+
         OBD_VMALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size);
         if (!lh->lh_buckets) {
                 OBD_FREE_PTR(lh);
@@ -156,18 +157,24 @@ EXPORT_SYMBOL(lustre_hash_exit);
 
 static inline unsigned int lustre_hash_rehash_size(lustre_hash_t *lh)
 {
+        int size;
+
         if (!(lh->lh_flags & LH_REHASH))
                 return 0;
 
         if ((lh->lh_cur_size < lh->lh_max_size) &&
             (__lustre_hash_theta(lh) > lh->lh_max_theta))
-                return MIN(lh->lh_cur_size * 2, lh->lh_max_size);
-
-        if ((lh->lh_cur_size > lh->lh_min_size) &&
+                size = min(lh->lh_cur_size * 2, lh->lh_max_size);
+        else if ((lh->lh_cur_size > lh->lh_min_size) &&
             (__lustre_hash_theta(lh) < lh->lh_min_theta))
-                return MAX(lh->lh_cur_size / 2, lh->lh_min_size);
+                size = max(lh->lh_cur_size / 2, lh->lh_min_size);
+        else
+                size = 0;
 
-        return 0;
+        if (lh->lh_cur_size == size)
+                size = 0;
+
+        return size;
 }
   
 /**
@@ -207,9 +214,9 @@ static struct hlist_node *
 lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
                                  struct hlist_node *hnode)
 {
+        int                   size = 0;
         struct hlist_node    *ehnode;
         lustre_hash_bucket_t *lhb;
-        int                   size;
         unsigned              i;
         ENTRY;
   
@@ -228,10 +235,9 @@ lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
         } else {
                 __lustre_hash_bucket_add(lh, lhb, hnode);
                 ehnode = hnode;
+                size = lustre_hash_rehash_size(lh);
         }
         write_unlock(&lhb->lhb_rwlock);
-
-        size = lustre_hash_rehash_size(lh);
         read_unlock(&lh->lh_rwlock);
         if (size)
                 lustre_hash_rehash(lh, size);
@@ -251,9 +257,10 @@ lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
         ENTRY;
         
         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
-        if (ehnode != hnode)
+        if (ehnode != hnode) {
+                lh_put(lh, ehnode);
                 RETURN(-EALREADY);
-        
+        }
         RETURN(0);
 }
 EXPORT_SYMBOL(lustre_hash_add_unique);
@@ -290,7 +297,6 @@ void *
 lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
 {
         lustre_hash_bucket_t *lhb;
-        int                   size;
         unsigned              i;
         void                 *obj;
         ENTRY;
@@ -306,11 +312,7 @@ lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
         write_lock(&lhb->lhb_rwlock);
         obj = __lustre_hash_bucket_del(lh, lhb, hnode);
         write_unlock(&lhb->lhb_rwlock);
-
-        size = lustre_hash_rehash_size(lh);
         read_unlock(&lh->lh_rwlock);
-        if (size)
-                lustre_hash_rehash(lh, size);
   
         RETURN(obj);
 }
@@ -327,7 +329,6 @@ lustre_hash_del_key(lustre_hash_t *lh, void *key)
 {
         struct hlist_node    *hnode;
         lustre_hash_bucket_t *lhb;
-        int                   size;
         unsigned              i;
         void                 *obj = NULL;
         ENTRY;
@@ -343,11 +344,7 @@ lustre_hash_del_key(lustre_hash_t *lh, void *key)
                 obj = __lustre_hash_bucket_del(lh, lhb, hnode);
 
         write_unlock(&lhb->lhb_rwlock);
-
-        size = lustre_hash_rehash_size(lh);
         read_unlock(&lh->lh_rwlock);
-        if (size)
-                lustre_hash_rehash(lh, size);
   
         RETURN(obj);
 }
@@ -498,8 +495,8 @@ restart:
         EXIT;
 }
 EXPORT_SYMBOL(lustre_hash_for_each_empty);
-  
-  /*
+
+/*
  * For each item in the lustre hash @lh which matches the @key call
  * the passed callback @func and pass to it as an argument each hash
  * item and the private @data.  Before each callback ops->lh_get will
@@ -566,6 +563,7 @@ lustre_hash_rehash(lustre_hash_t *lh, int size)
         ENTRY;
   
         LASSERT(size > 0);
+        LASSERT(!in_interrupt());
   
         OBD_VMALLOC(rehash_buckets, sizeof(*rehash_buckets) * size);
         if (!rehash_buckets)
-- 
GitLab