From fb4ae6ca52c5cace142c4e4f00071be17741c37a Mon Sep 17 00:00:00 2001 From: jxiong <jxiong> Date: Sat, 16 Jun 2007 07:23:18 +0000 Subject: [PATCH] b=10706 r=adilger,johann use RCU to improve the scalability of class handle object search, reloaded. --- lustre/autoconf/lustre-core.m4 | 32 +++++++ lustre/include/liblustre.h | 3 + lustre/include/linux/lustre_handles.h | 12 ++- lustre/include/lustre_handles.h | 9 ++ lustre/include/obd_support.h | 24 ++++++ lustre/ldlm/ldlm_lock.c | 13 ++- lustre/lov/lov_internal.h | 9 +- lustre/mds/mds_open.c | 3 +- lustre/obdclass/class_obd.c | 3 +- lustre/obdclass/genops.c | 7 +- lustre/obdclass/lustre_handles.c | 120 ++++++++++++++++++-------- 11 files changed, 186 insertions(+), 49 deletions(-) diff --git a/lustre/autoconf/lustre-core.m4 b/lustre/autoconf/lustre-core.m4 index 965460a80e..383c170fa6 100644 --- a/lustre/autoconf/lustre-core.m4 +++ b/lustre/autoconf/lustre-core.m4 @@ -1073,6 +1073,7 @@ LC_FUNC_HAVE_CAN_SLEEP_ARG LC_FUNC_F_OP_FLOCK LC_QUOTA_READ LC_COOKIE_FOLLOW_LINK +LC_FUNC_RCU # 2.6.15 LC_INODE_I_MUTEX @@ -1218,6 +1219,37 @@ LB_LINUX_TRY_COMPILE([ ]) ]) +# +# LC_FUNC_RCU +# +# kernels prior than 2.6.0(?) have no RCU supported; in kernel 2.6.5(SUSE), +# call_rcu takes three parameters. +# +AC_DEFUN([LC_FUNC_RCU], +[AC_MSG_CHECKING([if kernel have RCU supported]) +LB_LINUX_TRY_COMPILE([ + #include <linux/rcupdate.h> +],[],[ + AC_DEFINE(HAVE_RCU, 1, [have RCU defined]) + AC_MSG_RESULT([yes]) + + AC_MSG_CHECKING([if call_rcu takes three parameters]) + LB_LINUX_TRY_COMPILE([ + #include <linux/rcupdate.h> + ],[ + struct rcu_head rh; + call_rcu(&rh, (void (*)(struct rcu_head *))1, NULL); + ],[ + AC_DEFINE(HAVE_CALL_RCU_PARAM, 1, [call_rcu takes three parameters]) + AC_MSG_RESULT([yes]) + ],[ + AC_MSG_RESULT([no]) + ]) +],[ + AC_MSG_RESULT([no]) +]) +]) + # # LC_CONFIGURE # diff --git a/lustre/include/liblustre.h b/lustre/include/liblustre.h index b7a7d5967d..a79f470355 100644 --- a/lustre/include/liblustre.h +++ b/lustre/include/liblustre.h @@ -295,6 +295,8 @@ extern int echo_client_init(void); #define EXPORT_SYMBOL(S) +struct rcu_head { }; + typedef struct { } spinlock_t; typedef __u64 kdev_t; @@ -709,6 +711,7 @@ typedef struct { volatile int counter; } atomic_t; #define atomic_dec(a) do { (a)->counter--; } while (0) #define atomic_add(b,a) do {(a)->counter += b;} while (0) #define atomic_sub(b,a) do {(a)->counter -= b;} while (0) +#define ATOMIC_INIT(i) { i } #ifndef likely #define likely(exp) (exp) diff --git a/lustre/include/linux/lustre_handles.h b/lustre/include/linux/lustre_handles.h index 21eb047c79..166beb3d37 100644 --- a/lustre/include/linux/lustre_handles.h +++ b/lustre/include/linux/lustre_handles.h @@ -10,6 +10,16 @@ #include <asm/atomic.h> #include <linux/list.h> #include <linux/random.h> -#endif +#include <linux/version.h> +#include <linux/spinlock.h> +#include <linux/types.h> + +# ifdef HAVE_RCU +# include <linux/rcupdate.h> /* for rcu_head{} */ +# else +struct rcu_head { }; +# endif + +#endif /* ifdef __KERNEL__ */ #endif diff --git a/lustre/include/lustre_handles.h b/lustre/include/lustre_handles.h index bbd2fcd7d1..ac56c27fa7 100644 --- a/lustre/include/lustre_handles.h +++ b/lustre/include/lustre_handles.h @@ -29,7 +29,15 @@ struct portals_handle { struct list_head h_link; __u64 h_cookie; portals_handle_addref_cb h_addref; + + /* newly added fields to handle the RCU issue. -jxiong */ + spinlock_t h_lock; + unsigned int h_size; + void *h_ptr; + void (*h_free_cb)(void *, size_t); + struct rcu_head h_rcu; }; +#define RCU2HANDLE(rcu) container_of(rcu, struct portals_handle, h_rcu) /* handles.c */ @@ -37,6 +45,7 @@ struct portals_handle { void class_handle_hash(struct portals_handle *, portals_handle_addref_cb); void class_handle_unhash(struct portals_handle *); void *class_handle2object(__u64 cookie); +void class_handle_free_cb(struct rcu_head *); int class_handle_init(void); void class_handle_cleanup(void); diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 10678f895b..04c6509f4f 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -358,8 +358,32 @@ do { \ cfs_free(ptr); \ (ptr) = (void *)0xdeadbeef; \ } while (0) + +#ifdef HAVE_RCU +# ifdef HAVE_CALL_RCU_PARAM +# define my_call_rcu(rcu, cb) call_rcu(rcu, cb, rcu) +# else +# define my_call_rcu(rcu, cb) call_rcu(rcu, cb) +# endif +#else +# define my_call_rcu(rcu, cb) (cb)(rcu) +#endif + +#define OBD_FREE_RCU_CB(ptr, size, handle, free_cb) \ +do { \ + struct portals_handle *__h = (handle); \ + LASSERT(handle); \ + __h->h_ptr = (ptr); \ + __h->h_size = (size); \ + __h->h_free_cb = (void (*)(void *, size_t))(free_cb); \ + my_call_rcu(&__h->h_rcu, class_handle_free_cb); \ + (ptr) = (void *)0xdeadbeef; \ +} while(0) +#define OBD_FREE_RCU(ptr, size, handle) OBD_FREE_RCU_CB(ptr, size, handle, NULL) #else #define OBD_FREE(ptr, size) ((void)(size), free((ptr))) +#define OBD_FREE_RCU(ptr, size, handle) (OBD_FREE(ptr, size)) +#define OBD_FREE_RCU_CB(ptr, size, handle, cb) ((*(cb))(ptr, size)) #endif #ifdef __arch_um__ diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index dda01b7e80..b02a08ee54 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -128,6 +128,12 @@ struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock) return lock; } +static void ldlm_lock_free(struct ldlm_lock *lock, size_t size) +{ + LASSERT(size == sizeof(*lock)); + OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock)); +} + void ldlm_lock_put(struct ldlm_lock *lock) { ENTRY; @@ -150,13 +156,16 @@ void ldlm_lock_put(struct ldlm_lock *lock) atomic_dec(&res->lr_namespace->ns_locks); ldlm_resource_putref(res); lock->l_resource = NULL; - if (lock->l_export) + if (lock->l_export) { class_export_put(lock->l_export); + lock->l_export = NULL; + } if (lock->l_lvb_data != NULL) OBD_FREE(lock->l_lvb_data, lock->l_lvb_len); - OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock)); + OBD_FREE_RCU_CB(lock, sizeof(*lock), &lock->l_handle, + ldlm_lock_free); } EXIT; diff --git a/lustre/lov/lov_internal.h b/lustre/lov/lov_internal.h index 7099226760..62771c837c 100644 --- a/lustre/lov/lov_internal.h +++ b/lustre/lov/lov_internal.h @@ -111,9 +111,12 @@ static inline void lov_llh_put(struct lov_lock_handles *llh) atomic_read(&llh->llh_refcount) < 0x5a5a); if (atomic_dec_and_test(&llh->llh_refcount)) { class_handle_unhash(&llh->llh_handle); - LASSERT(list_empty(&llh->llh_handle.h_link)); - OBD_FREE(llh, sizeof *llh + - sizeof(*llh->llh_handles) * llh->llh_stripe_count); + /* The structure may be held by other threads because RCU. -jxiong */ + if (atomic_read(&llh->llh_refcount)) + return; + + OBD_FREE_RCU(llh, sizeof *llh + + sizeof(*llh->llh_handles) * llh->llh_stripe_count, &llh->llh_handle); } } diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 0bb9878d24..29515822c1 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -110,8 +110,7 @@ static void mds_mfd_put(struct mds_file_data *mfd) LASSERT(atomic_read(&mfd->mfd_refcount) > 0 && atomic_read(&mfd->mfd_refcount) < 0x5a5a); if (atomic_dec_and_test(&mfd->mfd_refcount)) { - LASSERT(list_empty(&mfd->mfd_handle.h_link)); - OBD_FREE(mfd, sizeof *mfd); + OBD_FREE_RCU(mfd, sizeof *mfd, &mfd->mfd_handle); } } diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index 0b5cc50960..2959d78486 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -416,6 +416,7 @@ EXPORT_SYMBOL(lustre_uuid_to_peer); EXPORT_SYMBOL(class_handle_hash); EXPORT_SYMBOL(class_handle_unhash); EXPORT_SYMBOL(class_handle2object); +EXPORT_SYMBOL(class_handle_free_cb); /* obd_config.c */ EXPORT_SYMBOL(class_incref); @@ -508,7 +509,6 @@ int obd_init_checks(void) #endif extern spinlock_t obd_types_lock; -extern spinlock_t handle_lock; extern int class_procfs_init(void); extern int class_procfs_clean(void); @@ -532,7 +532,6 @@ int init_obdclass(void) #endif spin_lock_init(&obd_types_lock); - spin_lock_init(&handle_lock); cfs_waitq_init(&obd_race_waitq); obd_zombie_impexp_init(); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 924d717d9f..b2c17e8581 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -638,10 +638,9 @@ void class_export_destroy(struct obd_export *exp) ptlrpc_put_connection_superhack(exp->exp_connection); LASSERT(list_empty(&exp->exp_outstanding_replies)); - LASSERT(list_empty(&exp->exp_handle.h_link)); obd_destroy_export(exp); - OBD_FREE(exp, sizeof(*exp)); + OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle); class_decref(obd); } @@ -776,10 +775,8 @@ void class_import_destroy(struct obd_import *import) OBD_FREE(imp_conn, sizeof(*imp_conn)); } - LASSERT(list_empty(&import->imp_handle.h_link)); class_decref(import->imp_obd); - OBD_FREE(import, sizeof(*import)); - + OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle); EXIT; } EXPORT_SYMBOL(class_import_put); diff --git a/lustre/obdclass/lustre_handles.c b/lustre/obdclass/lustre_handles.c index 9d4d1ecfd7..328d9f6238 100644 --- a/lustre/obdclass/lustre_handles.c +++ b/lustre/obdclass/lustre_handles.c @@ -32,11 +32,25 @@ #include <lustre_handles.h> #include <lustre_lib.h> -spinlock_t handle_lock; +#if !defined(HAVE_RCU) || !defined(__KERNEL__) +# define list_add_rcu list_add +# define list_del_rcu list_del +# define list_for_each_rcu list_for_each +# define list_for_each_safe_rcu list_for_each_safe +# define rcu_read_lock() spin_lock(&bucket->lock) +# define rcu_read_unlock() spin_unlock(&bucket->lock) +#endif /* ifndef HAVE_RCU */ + static __u64 handle_base; #define HANDLE_INCR 7 -static struct list_head *handle_hash = NULL; -static int handle_count = 0; +static spinlock_t handle_base_lock; + +static struct handle_bucket { + spinlock_t lock; + struct list_head head; +} *handle_hash; + +static atomic_t handle_count = ATOMIC_INIT(0); #define HANDLE_HASH_SIZE (1 << 14) #define HANDLE_HASH_MASK (HANDLE_HASH_SIZE - 1) @@ -47,25 +61,20 @@ static int handle_count = 0; */ void class_handle_hash(struct portals_handle *h, portals_handle_addref_cb cb) { - struct list_head *bucket; + struct handle_bucket *bucket; ENTRY; LASSERT(h != NULL); LASSERT(list_empty(&h->h_link)); - spin_lock(&handle_lock); - /* * This is fast, but simplistic cookie generation algorithm, it will * need a re-do at some point in the future for security. */ - h->h_cookie = handle_base; + spin_lock(&handle_base_lock); handle_base += HANDLE_INCR; - bucket = handle_hash + (h->h_cookie & HANDLE_HASH_MASK); - list_add(&h->h_link, bucket); - handle_count++; - + h->h_cookie = handle_base; if (unlikely(handle_base == 0)) { /* * Cookie of zero is "dangerous", because in many places it's @@ -75,10 +84,17 @@ void class_handle_hash(struct portals_handle *h, portals_handle_addref_cb cb) CWARN("The universe has been exhausted: cookie wrap-around.\n"); handle_base += HANDLE_INCR; } + spin_unlock(&handle_base_lock); - spin_unlock(&handle_lock); - + atomic_inc(&handle_count); h->h_addref = cb; + spin_lock_init(&h->h_lock); + + bucket = &handle_hash[h->h_cookie & HANDLE_HASH_MASK]; + spin_lock(&bucket->lock); + list_add_rcu(&h->h_link, &bucket->head); + spin_unlock(&bucket->lock); + CDEBUG(D_INFO, "added object %p with handle "LPX64" to hash\n", h, h->h_cookie); EXIT; @@ -95,56 +111,90 @@ static void class_handle_unhash_nolock(struct portals_handle *h) CDEBUG(D_INFO, "removing object %p with handle "LPX64" from hash\n", h, h->h_cookie); - handle_count--; - list_del_init(&h->h_link); + spin_lock(&h->h_lock); + if (h->h_cookie == 0) { + spin_unlock(&h->h_lock); + return; + } + h->h_cookie = 0; + spin_unlock(&h->h_lock); + list_del_rcu(&h->h_link); } void class_handle_unhash(struct portals_handle *h) { - spin_lock(&handle_lock); + struct handle_bucket *bucket; + bucket = handle_hash + (h->h_cookie & HANDLE_HASH_MASK); + + spin_lock(&bucket->lock); class_handle_unhash_nolock(h); - spin_unlock(&handle_lock); + spin_unlock(&bucket->lock); + + atomic_dec(&handle_count); } void *class_handle2object(__u64 cookie) { - struct list_head *bucket, *tmp; + struct handle_bucket *bucket; + struct list_head *tmp; void *retval = NULL; ENTRY; LASSERT(handle_hash != NULL); + /* Be careful when you want to change this code. See the + * rcu_read_lock() definition on top this file. - jxiong */ bucket = handle_hash + (cookie & HANDLE_HASH_MASK); - spin_lock(&handle_lock); - list_for_each(tmp, bucket) { + rcu_read_lock(); + list_for_each_rcu(tmp, &bucket->head) { struct portals_handle *h; h = list_entry(tmp, struct portals_handle, h_link); + if (h->h_cookie != cookie) + continue; - if (h->h_cookie == cookie) { + spin_lock(&h->h_lock); + if (likely(h->h_cookie != 0)) { h->h_addref(h); retval = h; - break; } + spin_unlock(&h->h_lock); + break; } - spin_unlock(&handle_lock); + rcu_read_unlock(); RETURN(retval); } +void class_handle_free_cb(struct rcu_head *rcu) +{ + struct portals_handle *h = RCU2HANDLE(rcu); + if (h->h_free_cb) { + h->h_free_cb(h->h_ptr, h->h_size); + } else { + void *ptr = h->h_ptr; + unsigned int size = h->h_size; + OBD_FREE(ptr, size); + } +} + + int class_handle_init(void) { - struct list_head *bucket; + struct handle_bucket *bucket; LASSERT(handle_hash == NULL); - OBD_VMALLOC(handle_hash, sizeof(*handle_hash) * HANDLE_HASH_SIZE); + OBD_VMALLOC(handle_hash, sizeof(*bucket) * HANDLE_HASH_SIZE); if (handle_hash == NULL) return -ENOMEM; + spin_lock_init(&handle_base_lock); for (bucket = handle_hash + HANDLE_HASH_SIZE - 1; bucket >= handle_hash; - bucket--) - CFS_INIT_LIST_HEAD(bucket); + bucket--) { + CFS_INIT_LIST_HEAD(&bucket->head); + spin_lock_init(&bucket->lock); + } ll_get_random_bytes(&handle_base, sizeof(handle_base)); LASSERT(handle_base != 0ULL); @@ -156,10 +206,10 @@ static void cleanup_all_handles(void) { int i; - spin_lock(&handle_lock); for (i = 0; i < HANDLE_HASH_SIZE; i++) { struct list_head *tmp, *pos; - list_for_each_safe(tmp, pos, &(handle_hash[i])) { + spin_lock(&handle_hash[i].lock); + list_for_each_safe_rcu(tmp, pos, &(handle_hash[i].head)) { struct portals_handle *h; h = list_entry(tmp, struct portals_handle, h_link); @@ -168,22 +218,24 @@ static void cleanup_all_handles(void) class_handle_unhash_nolock(h); } + spin_unlock(&handle_hash[i].lock); } - spin_unlock(&handle_lock); } void class_handle_cleanup(void) { + int count; LASSERT(handle_hash != NULL); - if (handle_count != 0) { - CERROR("handle_count at cleanup: %d\n", handle_count); + count = atomic_read(&handle_count); + if (count != 0) { + CERROR("handle_count at cleanup: %d\n", count); cleanup_all_handles(); } OBD_VFREE(handle_hash, sizeof(*handle_hash) * HANDLE_HASH_SIZE); handle_hash = NULL; - if (handle_count) - CERROR("leaked %d handles\n", handle_count); + if (atomic_read(&handle_count)) + CERROR("leaked %d handles\n", atomic_read(&handle_count)); } -- GitLab