diff --git a/lustre/include/class_hash.h b/lustre/include/class_hash.h index 53246130a83d3b5d4c3db27f4308ba306606262f..b153cf1b924b09cb6b25110a23e117f70acfd077 100644 --- a/lustre/include/class_hash.h +++ b/lustre/include/class_hash.h @@ -39,140 +39,286 @@ #include <lustre_lib.h> -/* #define LUSTRE_HASH_DEBUG 1 */ - -/* define the hash bucket*/ -struct lustre_hash_bucket { - struct hlist_head lhb_head; - spinlock_t lhb_lock; -#ifdef LUSTRE_HASH_DEBUG - /* the number of hash item per bucket, - * it will help us to analyse the hash distribute - */ - int lhb_item_count; -#endif -}; - -struct lustre_hash_operations; - -struct lustre_class_hash_body { - char hashname[128]; - spinlock_t lchb_lock; /* body lock */ - struct lustre_hash_bucket *lchb_hash_tables; - __u32 lchb_hash_max_size; /* define the hash tables size */ - /* define the hash operations */ - struct lustre_hash_operations *lchb_hash_operations; -}; - -/* hash operations method define */ -struct lustre_hash_operations { - __u32 (*lustre_hashfn) (struct lustre_class_hash_body *hash_body, - void *key); - int (*lustre_hash_key_compare) (void *key, - struct hlist_node *compared_hnode); - /* add refcount */ - void* (*lustre_hash_object_refcount_get) (struct hlist_node *hash_item); - /* dec refcount */ - void (*lustre_hash_object_refcount_put) (struct hlist_node *hash_item); -}; - -static inline struct hlist_node * -lustre_hash_getitem_in_bucket_nolock(struct lustre_class_hash_body *hash_body, - int hashent, void *key) -{ - struct lustre_hash_bucket *bucket; - struct hlist_node *hash_item_node; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; - int find = 0; - ENTRY; - - bucket = &hash_body->lchb_hash_tables[hashent]; - hlist_for_each(hash_item_node, &(bucket->lhb_head)) { - find = hop->lustre_hash_key_compare(key, hash_item_node); - if (find == 1) - break; +struct lustre_hash_ops; + +typedef struct lustre_hash_bucket { + struct hlist_head lhb_head; /* entries list */ + atomic_t lhb_count; /* current entries */ + rwlock_t lhb_rwlock; /* lustre_hash_bucket */ +} lustre_hash_bucket_t; + +#define LUSTRE_MAX_HASH_NAME 16 + +typedef struct lustre_hash { + int lh_cur_size; /* current hash size */ + int lh_min_size; /* min hash size */ + int lh_max_size; /* max hash size */ + int lh_min_theta; /* resize min threshold */ + int lh_max_theta; /* resize max threshold */ + int lh_flags; /* hash flags */ + atomic_t lh_count; /* current entries */ + atomic_t lh_rehash_count;/* resize count */ + struct lustre_hash_bucket *lh_buckets; /* hash buckets */ + struct lustre_hash_ops *lh_ops; /* hash operations */ + rwlock_t lh_rwlock; /* lustre_hash */ + char lh_name[LUSTRE_MAX_HASH_NAME]; +} lustre_hash_t; + +typedef struct lustre_hash_ops { + unsigned (*lh_hash)(lustre_hash_t *lh, void *key, unsigned mask); + void * (*lh_key)(struct hlist_node *hnode); + int (*lh_compare)(void *key, struct hlist_node *hnode); + void * (*lh_get)(struct hlist_node *hnode); + void * (*lh_put)(struct hlist_node *hnode); + void (*lh_exit)(struct hlist_node *hnode); +} lustre_hash_ops_t; + +#define LH_DEBUG 0x0001 /* Enable expensive debug checks */ +#define LH_REHASH 0x0002 /* Enable dynamic hash resizing */ + +#define LHO(lh) (lh)->lh_ops +#define LHP(lh, op) (lh)->lh_ops->lh_ ## op + +static inline unsigned +lh_hash(lustre_hash_t *lh, void *key, unsigned mask) +{ + LASSERT(lh); + LASSERT(LHO(lh)); + + if (LHP(lh, hash)) + return LHP(lh, hash)(lh, key, mask); + + return -EOPNOTSUPP; +} + +static inline void * +lh_key(lustre_hash_t *lh, struct hlist_node *hnode) +{ + LASSERT(lh); + LASSERT(hnode); + LASSERT(LHO(lh)); + + if (LHP(lh, key)) + return LHP(lh, key)(hnode); + + return NULL; +} + +/* Returns 1 on a match, + * XXX: This would be better if it returned, -1, 0, or 1 for + * <, =, > respectivly. It could then be used to implement + * a LH_SORT feature flags which could keep each lustre hash + * bucket in order. This would increase insertion times + * but could reduce lookup times for deep chains. Ideally, + * the rehash should keep chain depth short but if that + * ends up not being the case this would be a nice feature. + */ +static inline int +lh_compare(lustre_hash_t *lh, void *key, struct hlist_node *hnode) +{ + LASSERT(lh); + LASSERT(hnode); + LASSERT(LHO(lh)); + + if (LHP(lh, compare)) + return LHP(lh, compare)(key, hnode); + + return -EOPNOTSUPP; +} + +static inline void * +lh_get(lustre_hash_t *lh, struct hlist_node *hnode) +{ + LASSERT(lh); + LASSERT(hnode); + LASSERT(LHO(lh)); + + if (LHP(lh, get)) + return LHP(lh, get)(hnode); + + return NULL; +} + +static inline void * +lh_put(lustre_hash_t *lh, struct hlist_node *hnode) +{ + LASSERT(lh); + LASSERT(hnode); + LASSERT(LHO(lh)); + + if (LHP(lh, put)) + return LHP(lh, put)(hnode); + + return NULL; +} + +static inline void +lh_exit(lustre_hash_t *lh, struct hlist_node *hnode) +{ + LASSERT(lh); + LASSERT(hnode); + LASSERT(LHO(lh)); + + if (LHP(lh, exit)) + return LHP(lh, exit)(hnode); +} + +/* Validate hnode references the correct key */ +static inline void +__lustre_hash_key_validate(lustre_hash_t *lh, void *key, + struct hlist_node *hnode) +{ + if (unlikely(lh->lh_flags & LH_DEBUG)) + LASSERT(lh_compare(lh, key, hnode)); +} + +/* Validate hnode is in the correct bucket */ +static inline void +__lustre_hash_bucket_validate(lustre_hash_t *lh, lustre_hash_bucket_t *lhb, + struct hlist_node *hnode) +{ + unsigned i; + + if (unlikely(lh->lh_flags & LH_DEBUG)) { + i = lh_hash(lh, lh_key(lh, hnode), lh->lh_cur_size - 1); + LASSERT(&lh->lh_buckets[i] == lhb); } - RETURN(find == 1 ? hash_item_node : NULL); -} - -static inline int -lustre_hash_delitem_nolock(struct lustre_class_hash_body *hash_body, - int hashent, struct hlist_node * hash_item) -{ - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; - - hlist_del_init(hash_item); - - hop->lustre_hash_object_refcount_put(hash_item); - -#ifdef LUSTRE_HASH_DEBUG - hash_body->lchb_hash_tables[hashent].lhb_item_count--; - CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", - hash_body->hashname, hashent, - hash_body->lchb_hash_tables[hashent].lhb_item_count); -#endif - - RETURN(0); -} - -typedef void (*hash_item_iterate_cb) (void *obj, void *data); - -int lustre_hash_init(struct lustre_class_hash_body **hash_body, - char *hashname, __u32 hashsize, - struct lustre_hash_operations *hash_operations); -void lustre_hash_exit(struct lustre_class_hash_body **hash_body); -int lustre_hash_additem_unique(struct lustre_class_hash_body *hash_body, - void *key, struct hlist_node *actual_hnode); -void *lustre_hash_findadd_unique(struct lustre_class_hash_body *hash_body, - void *key, struct hlist_node *actual_hnode); -int lustre_hash_additem(struct lustre_class_hash_body *hash_body, void *key, - struct hlist_node *actual_hnode); -int lustre_hash_delitem_by_key(struct lustre_class_hash_body *hash_body, - void *key); -int lustre_hash_delitem(struct lustre_class_hash_body *hash_body, void *key, - struct hlist_node *hash_item); -void lustre_hash_bucket_iterate(struct lustre_class_hash_body *hash_body, - void *key, hash_item_iterate_cb, - void *data); -void lustre_hash_iterate_all(struct lustre_class_hash_body *hash_body, - hash_item_iterate_cb, void *data); - -void * lustre_hash_get_object_by_key(struct lustre_class_hash_body *hash_body, - void *key); - -__u32 djb2_hashfn(struct lustre_class_hash_body *hash_body, void* key, - size_t size); - -/* ( uuid <-> export ) hash operations define */ -__u32 uuid_hashfn(struct lustre_class_hash_body *hash_body, void * key); -int uuid_hash_key_compare(void *key, struct hlist_node * compared_hnode); -void * uuid_export_refcount_get(struct hlist_node * actual_hnode); -void uuid_export_refcount_put(struct hlist_node * actual_hnode); - -/* ( nid <-> export ) hash operations define */ -__u32 nid_hashfn(struct lustre_class_hash_body *hash_body, void * key); -int nid_hash_key_compare(void *key, struct hlist_node * compared_hnode); -void * nid_export_refcount_get(struct hlist_node * actual_hnode); -void nid_export_refcount_put(struct hlist_node * actual_hnode); - -/* ( net_peer <-> connection ) hash operations define */ -__u32 conn_hashfn(struct lustre_class_hash_body *hash_body, void * key); -int conn_hash_key_compare(void *key, struct hlist_node * compared_hnode); -void * conn_refcount_get(struct hlist_node * actual_hnode); -void conn_refcount_put(struct hlist_node * actual_hnode); - -/* ( nid <-> nidstats ) hash operations define. uses nid_hashfn */ -int nidstats_hash_key_compare(void *key, struct hlist_node * compared_hnode); -void* nidstats_refcount_get(struct hlist_node * actual_hnode); -void nidstats_refcount_put(struct hlist_node * actual_hnode); -extern struct lustre_hash_operations nid_stat_hash_operations; - -#ifdef __KERNEL__ -/* ( lqs <-> qctxt ) hash operations define b=10600 */ -__u32 lqs_hashfn(struct lustre_class_hash_body *hash_body, void * key); -int lqs_hash_key_compare(void *key, struct hlist_node * compared_hnode); -void * lqs_refcount_get(struct hlist_node * actual_hnode); -void lqs_refcount_put(struct hlist_node * actual_hnode); -#endif +} + +static inline struct hlist_node * +__lustre_hash_bucket_lookup(lustre_hash_t *lh, + lustre_hash_bucket_t *lhb, void *key) +{ + struct hlist_node *hnode; + + hlist_for_each(hnode, &lhb->lhb_head) + if (lh_compare(lh, key, hnode)) + return hnode; + + return NULL; +} + +static inline void * +__lustre_hash_bucket_add(lustre_hash_t *lh, + lustre_hash_bucket_t *lhb, + struct hlist_node *hnode) +{ + hlist_add_head(hnode, &(lhb->lhb_head)); + atomic_inc(&lhb->lhb_count); + atomic_inc(&lh->lh_count); + + return lh_get(lh, hnode); +} + +static inline void * +__lustre_hash_bucket_del(lustre_hash_t *lh, + lustre_hash_bucket_t *lhb, + struct hlist_node *hnode) +{ + hlist_del_init(hnode); + LASSERT(atomic_read(&lhb->lhb_count) > 0); + atomic_dec(&lhb->lhb_count); + LASSERT(atomic_read(&lh->lh_count) > 0); + atomic_dec(&lh->lh_count); + + return lh_put(lh, hnode); +} + +/* Hash init/cleanup functions */ +lustre_hash_t *lustre_hash_init(char *name, unsigned int cur_size, + unsigned int max_size, + lustre_hash_ops_t *ops, int flags); +void lustre_hash_exit(lustre_hash_t *lh); + +/* Hash addition functions */ +void lustre_hash_add(lustre_hash_t *lh, void *key, + struct hlist_node *hnode); +int lustre_hash_add_unique(lustre_hash_t *lh, void *key, + struct hlist_node *hnode); +void *lustre_hash_findadd_unique(lustre_hash_t *lh, void *key, + struct hlist_node *hnode); + +/* Hash deletion functions */ +void *lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode); +void *lustre_hash_del_key(lustre_hash_t *lh, void *key); + +/* Hash lookup/for_each functions */ +void *lustre_hash_lookup(lustre_hash_t *lh, void *key); +typedef void (*lh_for_each_cb)(void *obj, void *data); +void lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb, void *data); +void lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb, void *data); +void lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb, void *data); +void lustre_hash_for_each_key(lustre_hash_t *lh, void *key, + lh_for_each_cb, void *data); + +/* Rehash - Theta is calculated to be the average chained + * hash depth assuming a perfectly uniform hash funcion. */ +int lustre_hash_rehash(lustre_hash_t *lh, int size); +void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, + void *new_key, struct hlist_node *hnode); + + +static inline int +__lustre_hash_theta(lustre_hash_t *lh) +{ + return ((atomic_read(&lh->lh_count) * 1000) / lh->lh_cur_size); +} + +static inline void +__lustre_hash_set_theta(lustre_hash_t *lh, int min, int max) +{ + LASSERT(min < max); + lh->lh_min_theta = min; + lh->lh_min_theta = max; +} + +/* Generic debug formatting routines mainly for proc handler */ +int lustre_hash_debug_header(char *str, int size); +int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size); + +/* 2^31 + 2^29 - 2^25 + 2^22 - 2^19 - 2^16 + 1 */ +#define CFS_GOLDEN_RATIO_PRIME_32 0x9e370001UL +/* 2^63 + 2^61 - 2^57 + 2^54 - 2^51 - 2^18 + 1 */ +#define CFS_GOLDEN_RATIO_PRIME_64 0x9e37fffffffc0001ULL + +/* + * Generic djb2 hash algorithm for character arrays. + */ +static inline unsigned +lh_djb2_hash(void *key, size_t size, unsigned mask) +{ + unsigned i, hash = 5381; + + LASSERT(key != NULL); + + for (i = 0; i < size; i++) + hash = hash * 33 + ((char *)key)[i]; + + return (hash & mask); +} + +/* + * Generic u32 hash algorithm. + */ +static inline unsigned +lh_u32_hash(__u32 key, unsigned mask) +{ + return ((key * CFS_GOLDEN_RATIO_PRIME_32) & mask); +} + +/* + * Generic u64 hash algorithm. + */ +static inline unsigned +lh_u64_hash(__u64 key, unsigned mask) +{ + return ((unsigned)(key * CFS_GOLDEN_RATIO_PRIME_64) & mask); +} + +#define lh_for_each_bucket(lh, lhb, pos) \ + for (pos = 0; \ + pos < lh->lh_cur_size && \ + ({ lhb = &lh->lh_buckets[i]; 1; }); \ + pos++) #endif /* __CLASS_HASH_H */ diff --git a/lustre/include/liblustre.h b/lustre/include/liblustre.h index a2b6ebace338a0800d8249fc59bd00c9e50c5346..dc1e481b174863266d4f22166cbd7d1941983313 100644 --- a/lustre/include/liblustre.h +++ b/lustre/include/liblustre.h @@ -129,7 +129,6 @@ typedef unsigned short umode_t; #define set_page_private(page, v) ((page)->private = (v)) #endif - static inline void inter_module_put(void *a) { return; diff --git a/lustre/include/linux/lustre_compat25.h b/lustre/include/linux/lustre_compat25.h index 560f20bdb3aaa7a6649dbcd57c1e945da1ea7eb1..ceefbbefbec3b5b9c309cafc5deb168578873488 100644 --- a/lustre/include/linux/lustre_compat25.h +++ b/lustre/include/linux/lustre_compat25.h @@ -604,5 +604,10 @@ static inline long labs(long x) } #endif +/* Using kernel fls(). Userspace will use one defined in user-bitops.h. */ +#ifndef __fls +#define __fls fls +#endif + #endif /* __KERNEL__ */ #endif /* _COMPAT25_H */ diff --git a/lustre/include/lprocfs_status.h b/lustre/include/lprocfs_status.h index 34d9106630027e65a8a06b88bfc84a48a9741952..4fefd59ac42f29b563b6c136658227f867733b73 100644 --- a/lustre/include/lprocfs_status.h +++ b/lustre/include/lprocfs_status.h @@ -505,6 +505,10 @@ extern int lprocfs_counter_write(struct file *file, const char *buffer, int lprocfs_obd_rd_recovery_status(char *page, char **start, off_t off, int count, int *eof, void *data); +/* lprocfs_statuc.c: hash statistics */ +int lprocfs_obd_rd_hash(char *page, char **start, off_t off, + int count, int *eof, void *data); + extern int lprocfs_seq_release(struct inode *, struct file *); /* in lprocfs_stat.c, to protect the private data for proc entries */ diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 65f2a58bf8afe546dfbac81e1ee094ec1e9c99a1..2c297445d557165e21275337cf59fa08e55941d7 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -499,8 +499,8 @@ struct ldlm_lock { struct ldlm_interval *l_tree_node; /* tree node for ldlm_extent */ - /* protected by led_lock */ - struct list_head l_export_chain; // per-export chain of locks + /* protected by per-bucket exp->exp_lock_hash locks */ + struct hlist_node l_exp_hash; /* per export hash of locks */ /* protected by lr_lock */ ldlm_mode_t l_req_mode; @@ -699,6 +699,8 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock); int ldlm_refresh_waiting_lock(struct ldlm_lock *lock); int ldlm_get_ref(void); void ldlm_put_ref(void); +int ldlm_init_export(struct obd_export *exp); +void ldlm_destroy_export(struct obd_export *exp); /* ldlm_lock.c */ ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res); diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index 07b726d9adef7711d3d9b02a2956a922cbfd4b8c..c0074241f6331e47ad43d46719878d54cc3a7f3c 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -40,6 +40,7 @@ #include <lustre/lustre_idl.h> #include <lustre_dlm.h> #include <lprocfs_status.h> +#include <class_hash.h> /* Data stored per client in the last_rcvd file. In le32 order. */ struct mds_export_data { @@ -64,11 +65,6 @@ struct osc_creator { cfs_waitq_t oscc_waitq; /* creating procs wait on this */ }; -struct ldlm_export_data { - struct list_head led_held_locks; /* protected by led_lock below */ - spinlock_t led_lock; -}; - struct ec_export_data { /* echo client */ struct list_head eced_locks; }; @@ -122,7 +118,8 @@ struct obd_export { struct lprocfs_stats *exp_ops_stats; struct ptlrpc_connection *exp_connection; __u32 exp_conn_cnt; - struct ldlm_export_data exp_ldlm_data; + lustre_hash_t *exp_lock_hash; /* existing lock hash */ + spinlock_t exp_lock_hash_lock; struct list_head exp_outstanding_replies; time_t exp_last_request_time; struct list_head exp_req_replay_queue; diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 000e3e2a27a6e037b2ca3ecce17f2bb1bb44c0f8..160be77fca2e76782ffec7bd49c4b4141cf389ce 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -156,7 +156,6 @@ #define OST_MAXREPSIZE (9 * 1024) struct ptlrpc_connection { - struct list_head c_link; struct hlist_node c_hash; lnet_nid_t c_self; lnet_process_id_t c_peer; @@ -697,14 +696,13 @@ extern void reply_out_callback(lnet_event_t *ev); extern void server_bulk_callback (lnet_event_t *ev); /* ptlrpc/connection.c */ -void ptlrpc_dump_connections(void); -void ptlrpc_readdress_connection(struct ptlrpc_connection *, struct obd_uuid *); -struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer, - lnet_nid_t self, struct obd_uuid *uuid); -int ptlrpc_put_connection(struct ptlrpc_connection *c); +struct ptlrpc_connection *ptlrpc_connection_get(lnet_process_id_t peer, + lnet_nid_t self, + struct obd_uuid *uuid); +int ptlrpc_connection_put(struct ptlrpc_connection *c); struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *); -int ptlrpc_init_connection(void); -void ptlrpc_cleanup_connection(void); +int ptlrpc_connection_init(void); +void ptlrpc_connection_fini(void); extern lnet_pid_t ptl_get_pid(void); /* ptlrpc/niobuf.c */ diff --git a/lustre/include/lustre_quota.h b/lustre/include/lustre_quota.h index c2ded552ede3b37ba19fe0bb5918c229ed7711ec..77fdb73473f4460b40966dc8a53c937b522b10c5 100644 --- a/lustre/include/lustre_quota.h +++ b/lustre/include/lustre_quota.h @@ -51,6 +51,7 @@ #include <lustre_net.h> #include <lvfs.h> #include <obd_support.h> +#include <class_hash.h> struct obd_device; struct client_obd; @@ -248,9 +249,8 @@ struct lustre_quota_ctxt { * upper limitation for adjust block * qunit */ unsigned long lqc_btune_sz; /* See comment of lqc_itune_sz */ - struct lustre_class_hash_body *lqc_lqs_hash_body; - /* all lustre_qunit_size structure in - * it */ + struct lustre_hash *lqc_lqs_hash; /* all lustre_qunit_size structures */ + /* the values below are relative to how master change its qunit sizes */ unsigned long lqc_cqs_boundary_factor; /* this affects the boundary of * shrinking and enlarging qunit @@ -273,8 +273,6 @@ struct lustre_quota_ctxt { struct lprocfs_stats *lqc_stats; /* lquota statistics */ }; -#define LQC_HASH_BODY(qctxt) (qctxt->lqc_lqs_hash_body) - struct lustre_qunit_size { struct hlist_node lqs_hash; /* the hash entry */ unsigned int lqs_id; /* id of user/group */ @@ -297,6 +295,8 @@ struct lustre_qunit_size { cfs_time_t lqs_last_bshrink; /* time of last block shrink */ cfs_time_t lqs_last_ishrink; /* time of last inode shrink */ spinlock_t lqs_lock; + struct quota_adjust_qunit lqs_key; /* hash key */ + struct lustre_quota_ctxt *lqs_ctxt; /* quota ctxt */ }; #define LQS_IS_GRP(lqs) ((lqs)->lqs_flags & LQUOTA_FLAGS_GRP) @@ -310,15 +310,24 @@ struct lustre_qunit_size { static inline void lqs_getref(struct lustre_qunit_size *lqs) { atomic_inc(&lqs->lqs_refcount); + CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", + lqs, atomic_read(&lqs->lqs_refcount)); } static inline void lqs_putref(struct lustre_qunit_size *lqs) { - if (atomic_dec_and_test(&lqs->lqs_refcount)) { - spin_lock(&lqs->lqs_lock); - hlist_del_init(&lqs->lqs_hash); - spin_unlock(&lqs->lqs_lock); + LASSERT(atomic_read(&lqs->lqs_refcount) > 0); + + /* killing last ref, let's let hash table kill it */ + if (atomic_read(&lqs->lqs_refcount) == 1) { + lustre_hash_del(lqs->lqs_ctxt->lqc_lqs_hash, + &lqs->lqs_key, &lqs->lqs_hash); OBD_FREE_PTR(lqs); + } else { + atomic_dec(&lqs->lqs_refcount); + CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", + lqs, atomic_read(&lqs->lqs_refcount)); + } } diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 95278181c22cf9860f0e298687bc00a9b47787f0..7db51c5ac7c02453b466f162c130ae2741f63e97 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -819,11 +819,11 @@ struct obd_device { obd_inactive:1; /* device active/inactive * (for /proc/status only!!) */ /* uuid-export hash body */ - struct lustre_class_hash_body *obd_uuid_hash_body; + struct lustre_hash *obd_uuid_hash; /* nid-export hash body */ - struct lustre_class_hash_body *obd_nid_hash_body; + struct lustre_hash *obd_nid_hash; /* nid stats body */ - struct lustre_class_hash_body *obd_nid_stats_hash_body; + struct lustre_hash *obd_nid_stats_hash; struct list_head obd_nid_stats; atomic_t obd_refcount; cfs_waitq_t obd_refcount_waitq; diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 1ec8b7457c05c8c5a83475882621b6e062ccc034..08cc7142f9bea190546c362ed10090972a8fa17c 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -391,10 +391,11 @@ reprocess: new2->l_conn_export = lock->l_conn_export; if (lock->l_export != NULL) { new2->l_export = class_export_get(lock->l_export); - spin_lock(&new2->l_export->exp_ldlm_data.led_lock); - list_add(&new2->l_export_chain, - &new2->l_export->exp_ldlm_data.led_held_locks); - spin_unlock(&new2->l_export->exp_ldlm_data.led_lock); + if (new2->l_export->exp_lock_hash && + hlist_unhashed(&new2->l_exp_hash)) + lustre_hash_add(new2->l_export->exp_lock_hash, + &new2->l_remote_handle, + &new2->l_exp_hash); } if (*flags == LDLM_FL_WAIT_NOREPROC) { ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode); diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index e799c54e4326e6f4e02abb3ae8e31f5e39987fb1..5dc6af325860c3e729481101f9d6ccaa39fae814 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -118,7 +118,7 @@ out_free: if (imp_conn) OBD_FREE(imp_conn, sizeof(*imp_conn)); out_put: - ptlrpc_put_connection(ptlrpc_conn); + ptlrpc_connection_put(ptlrpc_conn); RETURN(rc); } @@ -161,20 +161,20 @@ int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid) GOTO(out, rc = -EBUSY); } - ptlrpc_put_connection(imp->imp_connection); + ptlrpc_connection_put(imp->imp_connection); imp->imp_connection = NULL; dlmexp = class_conn2export(&imp->imp_dlm_handle); if (dlmexp && dlmexp->exp_connection) { LASSERT(dlmexp->exp_connection == imp_conn->oic_conn); - ptlrpc_put_connection(dlmexp->exp_connection); + ptlrpc_connection_put(dlmexp->exp_connection); dlmexp->exp_connection = NULL; } } list_del(&imp_conn->oic_item); - ptlrpc_put_connection(imp_conn->oic_conn); + ptlrpc_connection_put(imp_conn->oic_conn); OBD_FREE(imp_conn, sizeof(*imp_conn)); CDEBUG(D_HA, "imp %p@%s: remove connection %s\n", imp, imp->imp_obd->obd_name, uuid->uuid); @@ -717,7 +717,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) goto dont_check_exports; spin_lock(&target->obd_dev_lock); - export = lustre_hash_get_object_by_key(target->obd_uuid_hash_body, &cluuid); + export = lustre_hash_lookup(target->obd_uuid_hash, &cluuid); if (export != NULL && export->exp_connecting) { /* bug 9635, et. al. */ CWARN("%s: exp %p already connecting\n", @@ -888,17 +888,18 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) } if (export->exp_connection != NULL) - ptlrpc_put_connection(export->exp_connection); - export->exp_connection = ptlrpc_get_connection(req->rq_peer, + ptlrpc_connection_put(export->exp_connection); + export->exp_connection = ptlrpc_connection_get(req->rq_peer, req->rq_self, &remote_uuid); spin_lock(&target->obd_dev_lock); /* Export might be hashed already, e.g. if this is reconnect */ if (hlist_unhashed(&export->exp_nid_hash)) - lustre_hash_additem(export->exp_obd->obd_nid_hash_body, - &export->exp_connection->c_peer.nid, - &export->exp_nid_hash); + lustre_hash_add(export->exp_obd->obd_nid_hash, + &export->exp_connection->c_peer.nid, + &export->exp_nid_hash); + spin_unlock(&target->obd_dev_lock); if (lustre_msg_get_op_flags(req->rq_repmsg) & MSG_CONNECT_RECONNECT) { diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 2a53ef459fe8d32a75b02da58ddc62840a817b5e..def9fc88852d15572b4ae01e74ff683f55295a9f 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -265,11 +265,9 @@ int ldlm_lock_destroy_internal(struct ldlm_lock *lock) } lock->l_destroyed = 1; - if (lock->l_export) - spin_lock(&lock->l_export->exp_ldlm_data.led_lock); - list_del_init(&lock->l_export_chain); - if (lock->l_export) - spin_unlock(&lock->l_export->exp_ldlm_data.led_lock); + if (lock->l_export && lock->l_export->exp_lock_hash) + lustre_hash_del(lock->l_export->exp_lock_hash, + &lock->l_remote_handle, &lock->l_exp_hash); ldlm_lock_remove_from_lru(lock); class_handle_unhash(&lock->l_handle); @@ -343,7 +341,6 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) atomic_set(&lock->l_refc, 2); CFS_INIT_LIST_HEAD(&lock->l_res_link); CFS_INIT_LIST_HEAD(&lock->l_lru); - CFS_INIT_LIST_HEAD(&lock->l_export_chain); CFS_INIT_LIST_HEAD(&lock->l_pending_chain); CFS_INIT_LIST_HEAD(&lock->l_bl_ast); CFS_INIT_LIST_HEAD(&lock->l_cp_ast); @@ -351,6 +348,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) lock->l_blocking_lock = NULL; CFS_INIT_LIST_HEAD(&lock->l_sl_mode); CFS_INIT_LIST_HEAD(&lock->l_sl_policy); + CFS_INIT_HLIST_NODE(&lock->l_exp_hash); atomic_inc(&resource->lr_namespace->ns_locks); CFS_INIT_LIST_HEAD(&lock->l_handle.h_link); @@ -1588,30 +1586,29 @@ int ldlm_lock_set_data(struct lustre_handle *lockh, void *data) RETURN(0); } -void ldlm_cancel_locks_for_export(struct obd_export *exp) +void ldlm_cancel_locks_for_export_cb(void *obj, void *data) { - struct ldlm_lock *lock; - struct ldlm_resource *res; + struct obd_export *exp = data; + struct ldlm_lock *lock = obj; + struct ldlm_resource *res; - spin_lock(&exp->exp_ldlm_data.led_lock); - while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) { - lock = list_entry(exp->exp_ldlm_data.led_held_locks.next, - struct ldlm_lock, l_export_chain); - res = ldlm_resource_getref(lock->l_resource); - LDLM_LOCK_GET(lock); - spin_unlock(&exp->exp_ldlm_data.led_lock); + res = ldlm_resource_getref(lock->l_resource); + LDLM_LOCK_GET(lock); - LDLM_DEBUG(lock, "export %p", exp); - ldlm_res_lvbo_update(res, NULL, 0, 1); + LDLM_DEBUG(lock, "export %p", exp); + ldlm_res_lvbo_update(res, NULL, 0, 1); - ldlm_lock_cancel(lock); - ldlm_reprocess_all(res); + ldlm_lock_cancel(lock); + ldlm_reprocess_all(res); - ldlm_resource_putref(res); - LDLM_LOCK_PUT(lock); - spin_lock(&exp->exp_ldlm_data.led_lock); - } - spin_unlock(&exp->exp_ldlm_data.led_lock); + ldlm_resource_putref(res); + LDLM_LOCK_PUT(lock); +} + +void ldlm_cancel_locks_for_export(struct obd_export *exp) +{ + lustre_hash_for_each_empty(exp->exp_lock_hash, + ldlm_cancel_locks_for_export_cb, exp); } struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index ad41a12003bb692aff9dbfedf96adb96fd36efa7..7dfbbd1a2c19f1f1899e8e2ea567e7cbca391170 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -829,25 +829,6 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) RETURN(rc); } -static struct ldlm_lock * -find_existing_lock(struct obd_export *exp, struct lustre_handle *remote_hdl) -{ - struct list_head *iter; - - spin_lock(&exp->exp_ldlm_data.led_lock); - list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { - struct ldlm_lock *lock; - lock = list_entry(iter, struct ldlm_lock, l_export_chain); - if (lock->l_remote_handle.cookie == remote_hdl->cookie) { - LDLM_LOCK_GET(lock); - spin_unlock(&exp->exp_ldlm_data.led_lock); - return lock; - } - } - spin_unlock(&exp->exp_ldlm_data.led_lock); - return NULL; -} - static void ldlm_svc_get_eopc(struct ldlm_request *dlm_req, struct lprocfs_stats *srv_stats) { @@ -972,8 +953,9 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, #endif if (flags & LDLM_FL_REPLAY) { - lock = find_existing_lock(req->rq_export, - &dlm_req->lock_handle[0]); + /* Find an existing lock in the per-export lock hash */ + lock = lustre_hash_lookup(req->rq_export->exp_lock_hash, + (void *)&dlm_req->lock_handle[0]); if (lock != NULL) { DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie " LPX64, lock->l_handle.h_cookie); @@ -1003,10 +985,11 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, GOTO(out, rc = -ENOTCONN); } lock->l_export = class_export_get(req->rq_export); - spin_lock(&lock->l_export->exp_ldlm_data.led_lock); - list_add(&lock->l_export_chain, - &lock->l_export->exp_ldlm_data.led_held_locks); - spin_unlock(&lock->l_export->exp_ldlm_data.led_lock); + + if (lock->l_export->exp_lock_hash) + lustre_hash_add(lock->l_export->exp_lock_hash, + &lock->l_remote_handle, + &lock->l_exp_hash); existing_lock: @@ -1892,6 +1875,88 @@ static int ldlm_bl_thread_main(void *arg) #endif +/* + * Export handle<->lock hash operations. + */ +static unsigned +ldlm_export_lock_hash(lustre_hash_t *lh, void *key, unsigned mask) +{ + return lh_u64_hash(((struct lustre_handle *)key)->cookie, mask); +} + +static void * +ldlm_export_lock_key(struct hlist_node *hnode) +{ + struct ldlm_lock *lock; + ENTRY; + + lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash); + RETURN(&lock->l_remote_handle); +} + +static int +ldlm_export_lock_compare(void *key, struct hlist_node *hnode) +{ + ENTRY; + RETURN(lustre_handle_equal(ldlm_export_lock_key(hnode), key)); +} + +static void * +ldlm_export_lock_get(struct hlist_node *hnode) +{ + struct ldlm_lock *lock; + ENTRY; + + lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash); + LDLM_LOCK_GET(lock); + + RETURN(lock); +} + +static void * +ldlm_export_lock_put(struct hlist_node *hnode) +{ + struct ldlm_lock *lock; + ENTRY; + + lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash); + LDLM_LOCK_PUT(lock); + + RETURN(lock); +} + +static lustre_hash_ops_t ldlm_export_lock_ops = { + .lh_hash = ldlm_export_lock_hash, + .lh_key = ldlm_export_lock_key, + .lh_compare = ldlm_export_lock_compare, + .lh_get = ldlm_export_lock_get, + .lh_put = ldlm_export_lock_put +}; + +int ldlm_init_export(struct obd_export *exp) +{ + ENTRY; + + exp->exp_lock_hash = + lustre_hash_init(obd_uuid2str(&exp->exp_client_uuid), + 128, 65536, &ldlm_export_lock_ops, LH_REHASH); + + if (!exp->exp_lock_hash) + RETURN(-ENOMEM); + + RETURN(0); +} +EXPORT_SYMBOL(ldlm_init_export); + +void ldlm_destroy_export(struct obd_export *exp) +{ + ENTRY; + lustre_hash_exit(exp->exp_lock_hash); + exp->exp_lock_hash = NULL; + EXIT; +} +EXPORT_SYMBOL(ldlm_destroy_export); + static int ldlm_setup(void); static int ldlm_cleanup(void); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 68c24dce3b6d6cfc81f03cc06e1aebee2390c8a4..d8a88fe8170739e6a2d140a53cec76379be39d5c 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -371,6 +371,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, { struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; int is_replay = *flags & LDLM_FL_REPLAY; + struct lustre_handle old_hash_key; struct ldlm_lock *lock; struct ldlm_reply *reply; int cleanup_phase = 1; @@ -422,7 +423,15 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, cleanup_phase = 0; lock_res_and_lock(lock); + old_hash_key = lock->l_remote_handle; lock->l_remote_handle = reply->lock_handle; + + /* Key change rehash lock in per-export hash with new key */ + if (exp->exp_lock_hash) + lustre_hash_rehash_key(exp->exp_lock_hash, &old_hash_key, + &lock->l_remote_handle, + &lock->l_exp_hash); + *flags = reply->lock_flags; lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS; /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match() @@ -1862,15 +1871,16 @@ static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure) static int replay_lock_interpret(struct ptlrpc_request *req, struct ldlm_async_args *aa, int rc) { - struct ldlm_lock *lock; - struct ldlm_reply *reply; + struct lustre_handle old_hash_key; + struct ldlm_lock *lock; + struct ldlm_reply *reply; + struct obd_export *exp; ENTRY; atomic_dec(&req->rq_import->imp_replay_inflight); if (rc != ELDLM_OK) GOTO(out, rc); - reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply), lustre_swab_ldlm_reply); if (reply == NULL) { @@ -1888,7 +1898,16 @@ static int replay_lock_interpret(struct ptlrpc_request *req, GOTO(out, rc = -ESTALE); } + old_hash_key = lock->l_remote_handle; lock->l_remote_handle = reply->lock_handle; + + /* Key change rehash lock in per-export hash with new key */ + exp = req->rq_export; + if (exp && exp->exp_lock_hash) + lustre_hash_rehash_key(exp->exp_lock_hash, &old_hash_key, + &lock->l_remote_handle, + &lock->l_exp_hash); + LDLM_DEBUG(lock, "replayed lock:"); ptlrpc_import_recovery_state_machine(req->rq_import); LDLM_LOCK_PUT(lock); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 9b43e7d138537764bc39c8ef0b83de33ae3ee110..bf31f947b4c667d6dea4b1e07f942bf3d1d26e17 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -403,6 +403,7 @@ out: int mds_init_export(struct obd_export *exp) { struct mds_export_data *med = &exp->exp_mds_data; + ENTRY; INIT_LIST_HEAD(&med->med_open_head); spin_lock_init(&med->med_open_lock); @@ -411,7 +412,7 @@ int mds_init_export(struct obd_export *exp) exp->exp_connecting = 1; spin_unlock(&exp->exp_lock); - RETURN(0); + RETURN(ldlm_init_export(exp)); } static int mds_destroy_export(struct obd_export *export) @@ -428,6 +429,7 @@ static int mds_destroy_export(struct obd_export *export) med = &export->exp_mds_data; target_destroy_export(export); + ldlm_destroy_export(export); if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid)) RETURN(0); @@ -2386,29 +2388,26 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset, struct ldlm_request *dlmreq = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*dlmreq)); struct lustre_handle remote_hdl = dlmreq->lock_handle[0]; - struct list_head *iter; + struct ldlm_lock *lock; if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) return; - spin_lock(&exp->exp_ldlm_data.led_lock); - list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { - struct ldlm_lock *lock; - lock = list_entry(iter, struct ldlm_lock, l_export_chain); - if (lock == new_lock) - continue; - if (lock->l_remote_handle.cookie == remote_hdl.cookie) { + lock = lustre_hash_lookup(exp->exp_lock_hash, &remote_hdl); + if (lock) { + if (lock != new_lock) { lockh->cookie = lock->l_handle.h_cookie; LDLM_DEBUG(lock, "restoring lock cookie"); - DEBUG_REQ(D_DLMTRACE, req,"restoring lock cookie "LPX64, - lockh->cookie); + DEBUG_REQ(D_DLMTRACE, req, "restoring lock cookie " + LPX64, lockh->cookie); if (old_lock) *old_lock = LDLM_LOCK_GET(lock); - spin_unlock(&exp->exp_ldlm_data.led_lock); + + lh_put(exp->exp_lock_hash, &lock->l_exp_hash); return; } + lh_put(exp->exp_lock_hash, &lock->l_exp_hash); } - spin_unlock(&exp->exp_ldlm_data.led_lock); /* If the xid matches, then we know this is a resent request, * and allow it. (It's probably an OPEN, for which we don't @@ -2623,18 +2622,15 @@ static int mds_intent_policy(struct ldlm_namespace *ns, new_lock->l_writers = 0; new_lock->l_export = class_export_get(req->rq_export); - spin_lock(&req->rq_export->exp_ldlm_data.led_lock); - list_add(&new_lock->l_export_chain, - &new_lock->l_export->exp_ldlm_data.led_held_locks); - spin_unlock(&req->rq_export->exp_ldlm_data.led_lock); - new_lock->l_blocking_ast = lock->l_blocking_ast; new_lock->l_completion_ast = lock->l_completion_ast; + new_lock->l_flags &= ~LDLM_FL_LOCAL; memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle, sizeof(lock->l_remote_handle)); - new_lock->l_flags &= ~LDLM_FL_LOCAL; + lustre_hash_add(new_lock->l_export->exp_lock_hash, + &new_lock->l_remote_handle, &new_lock->l_exp_hash); unlock_res_and_lock(new_lock); LDLM_LOCK_PUT(new_lock); diff --git a/lustre/mds/lproc_mds.c b/lustre/mds/lproc_mds.c index 1f43922e454f63819ace59089f105cb35169443e..6a813599fcb18ae82c5a4db16d54b91bdc94e1e4 100644 --- a/lustre/mds/lproc_mds.c +++ b/lustre/mds/lproc_mds.c @@ -416,6 +416,7 @@ struct lprocfs_vars lprocfs_mds_obd_vars[] = { { "fstype", lprocfs_rd_fstype, 0, 0 }, { "mntdev", lprocfs_mds_rd_mntdev, 0, 0 }, { "recovery_status", lprocfs_obd_rd_recovery_status, 0, 0 }, + { "hash_stats", lprocfs_obd_rd_hash, 0, 0 }, { "evict_client", 0, lprocfs_mds_wr_evict_client, 0 }, { "evict_ost_nids", lprocfs_mds_rd_evictostnids, lprocfs_mds_wr_evictostnids, 0 }, diff --git a/lustre/mgs/lproc_mgs.c b/lustre/mgs/lproc_mgs.c index b745bbe545000001696f81bbd4875dad178cea78..cee1ce4764b32d9b5b9b52f83f74ff259c24a696 100644 --- a/lustre/mgs/lproc_mgs.c +++ b/lustre/mgs/lproc_mgs.c @@ -175,6 +175,7 @@ struct lprocfs_vars lprocfs_mgs_obd_vars[] = { { "fstype", lprocfs_rd_fstype, 0, 0 }, { "mntdev", lprocfs_mgs_rd_mntdev, 0, 0 }, { "num_exports", lprocfs_rd_num_exports, 0, 0 }, + { "hash_stats", lprocfs_obd_rd_hash, 0, 0 }, { "evict_client", 0, lprocfs_wr_evict_client, 0 }, { 0 } }; diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index a3890846737fcd4d09f04ed7970e233832bf0a6a..0f0cb582964d228f1f6d06281bd229288b28c55b 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -650,11 +650,17 @@ int mgs_handle(struct ptlrpc_request *req) RETURN(0); } +static inline int mgs_init_export(struct obd_export *exp) +{ + return ldlm_init_export(exp); +} + static inline int mgs_destroy_export(struct obd_export *exp) { ENTRY; target_destroy_export(exp); + ldlm_destroy_export(exp); mgs_client_free(exp); RETURN(0); @@ -767,6 +773,7 @@ static struct obd_ops mgs_obd_ops = { .o_setup = mgs_setup, .o_precleanup = mgs_precleanup, .o_cleanup = mgs_cleanup, + .o_init_export = mgs_init_export, .o_destroy_export = mgs_destroy_export, .o_iocontrol = mgs_iocontrol, }; diff --git a/lustre/obdclass/class_hash.c b/lustre/obdclass/class_hash.c index a04a10fa8fe71d4baa34d76c8d76fbaf79ac206c..919fa4ac1e52fd4bb343a0a9ac9b685350128cda 100644 --- a/lustre/obdclass/class_hash.c +++ b/lustre/obdclass/class_hash.c @@ -38,6 +38,14 @@ * Implement a hash class for hash process in lustre system. * * Author: YuZhangyong <yzy@clusterfs.com> + * + * 2008-08-15: Brian Behlendorf <behlendorf1@llnl.gov> + * - Simplified API and improved documentation + * - Added per-hash feature flags: + * * LH_DEBUG additional validation + * * LH_REHASH dynamic rehashing + * - Added per-hash statistics + * - General performance enhancements */ #ifndef __KERNEL__ @@ -45,741 +53,705 @@ #include <obd.h> #endif -#include <obd_class.h> #include <class_hash.h> -#include <lustre_export.h> -#include <obd_support.h> -#include <lustre_net.h> -#include <lustre_quota.h> - -int lustre_hash_init(struct lustre_class_hash_body **hash_body_new, - char *hashname, __u32 hashsize, - struct lustre_hash_operations *hash_operations) -{ - int i, n = 0; - struct lustre_class_hash_body *hash_body = NULL; - LASSERT(hashsize > 0); - LASSERT(hash_operations != NULL); +/** + * Initialize new lustre hash, where: + * @name - Descriptive hash name + * @cur_size - Initial hash table size + * @max_size - Maximum allowed hash table resize + * @ops - Registered hash table operations + * @flags - LH_REHASH enable synamic hash resizing + * - LH_SORT enable chained hash sort + */ +lustre_hash_t * +lustre_hash_init(char *name, unsigned int cur_size, unsigned int max_size, + lustre_hash_ops_t *ops, int flags) +{ + lustre_hash_t *lh; + int i; ENTRY; - - i = hashsize; - while (i != 0) { - if (i & 0x1) - n++; - i >>= 1; - } - - LASSERTF(n == 1, "hashsize %u isn't 2^n\n", hashsize); - - /* alloc space for hash_body */ - OBD_ALLOC(hash_body, sizeof(*hash_body)); - - if (hash_body == NULL) { - CERROR("Cannot alloc space for hash body, hashname = %s \n", - hashname); - RETURN(-ENOMEM); - } - - LASSERT(hashname != NULL && - strlen(hashname) <= sizeof(hash_body->hashname)); - strcpy(hash_body->hashname, hashname); - hash_body->lchb_hash_max_size = hashsize; - hash_body->lchb_hash_operations = hash_operations; - - /* alloc space for the hash tables */ - OBD_ALLOC(hash_body->lchb_hash_tables, - sizeof(*hash_body->lchb_hash_tables) * hash_body->lchb_hash_max_size); - - if (hash_body->lchb_hash_tables == NULL) { - OBD_FREE(hash_body, sizeof(*hash_body)); - CERROR("Cannot alloc space for hashtables, hashname = %s \n", - hash_body->hashname); - RETURN(-ENOMEM); + + LASSERT(name != NULL); + LASSERT(ops != NULL); + + /* + * Ensure hash is a power of two to allow the use of a bitmask + * in the hash function instead of a more expensive modulus. + */ + LASSERTF(cur_size && (cur_size & (cur_size - 1)) == 0, + "Size (%u) is not power of 2\n", cur_size); + LASSERTF(max_size && (max_size & (max_size - 1)) == 0, + "Size (%u) is not power of 2\n", max_size); + + OBD_ALLOC_PTR(lh); + if (!lh) + RETURN(NULL); + + strncpy(lh->lh_name, name, sizeof(lh->lh_name)); + atomic_set(&lh->lh_rehash_count, 0); + atomic_set(&lh->lh_count, 0); + rwlock_init(&lh->lh_rwlock); + lh->lh_cur_size = cur_size; + lh->lh_min_size = cur_size; + lh->lh_max_size = max_size; + lh->lh_min_theta = 500; /* theta * 1000 */ + lh->lh_max_theta = 2000; /* theta * 1000 */ + lh->lh_ops = ops; + lh->lh_flags = flags; + + OBD_VMALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size); + if (!lh->lh_buckets) { + OBD_FREE_PTR(lh); + RETURN(NULL); } - - spin_lock_init(&hash_body->lchb_lock); /* initialize the body lock */ - - for(i = 0 ; i < hash_body->lchb_hash_max_size; i++) { - /* initial the bucket lock and list_head */ - INIT_HLIST_HEAD(&hash_body->lchb_hash_tables[i].lhb_head); - spin_lock_init(&hash_body->lchb_hash_tables[i].lhb_lock); + + for (i = 0; i < lh->lh_cur_size; i++) { + INIT_HLIST_HEAD(&lh->lh_buckets[i].lhb_head); + rwlock_init(&lh->lh_buckets[i].lhb_rwlock); + atomic_set(&lh->lh_buckets[i].lhb_count, 0); } - *hash_body_new = hash_body; - - RETURN(0); + + return lh; } EXPORT_SYMBOL(lustre_hash_init); - -void lustre_hash_exit(struct lustre_class_hash_body **new_hash_body) + +/** + * Cleanup lustre hash @lh. + */ +void +lustre_hash_exit(lustre_hash_t *lh) { - int i; - struct lustre_class_hash_body *hash_body = NULL; + lustre_hash_bucket_t *lhb; + struct hlist_node *hnode; + struct hlist_node *pos; + int i; ENTRY; - - hash_body = *new_hash_body; - - if (hash_body == NULL) { - CWARN("hash body has been deleted\n"); - goto out_hash; - } - - spin_lock(&hash_body->lchb_lock); /* lock the hash tables */ - - if (hash_body->lchb_hash_tables == NULL ) { - spin_unlock(&hash_body->lchb_lock); - CWARN("hash tables has been deleted\n"); - goto out_hash; - } - - for( i = 0; i < hash_body->lchb_hash_max_size; i++ ) { - struct lustre_hash_bucket * bucket; - struct hlist_node * actual_hnode, *pos; - - bucket = &hash_body->lchb_hash_tables[i]; - spin_lock(&bucket->lhb_lock); /* lock the bucket */ - hlist_for_each_safe(actual_hnode, pos, &(bucket->lhb_head)) { - lustre_hash_delitem_nolock(hash_body, i, actual_hnode); + + if (!lh) + return; + + write_lock(&lh->lh_rwlock); + + lh_for_each_bucket(lh, lhb, i) { + write_lock(&lhb->lhb_rwlock); + hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) { + __lustre_hash_bucket_validate(lh, lhb, hnode); + __lustre_hash_bucket_del(lh, lhb, hnode); + lh_exit(lh, hnode); } - spin_unlock(&bucket->lhb_lock); + + LASSERT(hlist_empty(&(lhb->lhb_head))); + LASSERT(atomic_read(&lhb->lhb_count) == 0); + write_unlock(&lhb->lhb_rwlock); } + + OBD_VFREE(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size); + LASSERT(atomic_read(&lh->lh_count) == 0); + write_unlock(&lh->lh_rwlock); + + OBD_FREE_PTR(lh); + EXIT; +} +EXPORT_SYMBOL(lustre_hash_exit); - /* free the hash_tables's memory space */ - OBD_FREE(hash_body->lchb_hash_tables, - sizeof(*hash_body->lchb_hash_tables) * - hash_body->lchb_hash_max_size); +static inline unsigned int lustre_hash_rehash_size(lustre_hash_t *lh) +{ + if (!(lh->lh_flags & LH_REHASH)) + return 0; - hash_body->lchb_hash_tables = NULL; + if ((lh->lh_cur_size < lh->lh_max_size) && + (__lustre_hash_theta(lh) > lh->lh_max_theta)) + return MIN(lh->lh_cur_size * 2, lh->lh_max_size); - spin_unlock(&hash_body->lchb_lock); + if ((lh->lh_cur_size > lh->lh_min_size) && + (__lustre_hash_theta(lh) < lh->lh_min_theta)) + return MAX(lh->lh_cur_size / 2, lh->lh_min_size); -out_hash : - /* free the hash_body's memory space */ - if (hash_body != NULL) { - OBD_FREE(hash_body, sizeof(*hash_body)); - *new_hash_body = NULL; - } + return 0; +} + +/** + * Add item @hnode to lustre hash @lh using @key. The registered + * ops->lh_get function will be called when the item is added. + */ +void +lustre_hash_add(lustre_hash_t *lh, void *key, struct hlist_node *hnode) +{ + lustre_hash_bucket_t *lhb; + int size; + unsigned i; + ENTRY; + + __lustre_hash_key_validate(lh, key, hnode); + + read_lock(&lh->lh_rwlock); + i = lh_hash(lh, key, lh->lh_cur_size - 1); + lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + LASSERT(hlist_unhashed(hnode)); + + write_lock(&lhb->lhb_rwlock); + __lustre_hash_bucket_add(lh, lhb, hnode); + write_unlock(&lhb->lhb_rwlock); + + size = lustre_hash_rehash_size(lh); + read_unlock(&lh->lh_rwlock); + if (size) + lustre_hash_rehash(lh, size); + EXIT; } -EXPORT_SYMBOL(lustre_hash_exit); +EXPORT_SYMBOL(lustre_hash_add); -/* - * only allow unique @key in hashtables, if the same @key has existed - * in hashtables, it will return with fails. +static struct hlist_node * +lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key, + struct hlist_node *hnode) +{ + struct hlist_node *ehnode; + lustre_hash_bucket_t *lhb; + int size; + unsigned i; + ENTRY; + + __lustre_hash_key_validate(lh, key, hnode); + + read_lock(&lh->lh_rwlock); + i = lh_hash(lh, key, lh->lh_cur_size - 1); + lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + LASSERT(hlist_unhashed(hnode)); + + write_lock(&lhb->lhb_rwlock); + ehnode = __lustre_hash_bucket_lookup(lh, lhb, key); + if (ehnode) { + lh_get(lh, ehnode); + } else { + __lustre_hash_bucket_add(lh, lhb, hnode); + ehnode = hnode; + } + write_unlock(&lhb->lhb_rwlock); + + size = lustre_hash_rehash_size(lh); + read_unlock(&lh->lh_rwlock); + if (size) + lustre_hash_rehash(lh, size); + + RETURN(ehnode); +} + +/** + * Add item @hnode to lustre hash @lh using @key. The registered + * ops->lh_get function will be called if the item was added. + * Returns 0 on success or -EALREADY on key collisions. */ -int lustre_hash_additem_unique(struct lustre_class_hash_body *hash_body, - void *key, struct hlist_node *actual_hnode) +int +lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode) { - int hashent; - struct lustre_hash_bucket *bucket = NULL; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; + struct hlist_node *ehnode; ENTRY; - - LASSERT(hlist_unhashed(actual_hnode)); - hashent = hop->lustre_hashfn(hash_body, key); - - /* get the hash-bucket and lock it */ - bucket = &hash_body->lchb_hash_tables[hashent]; - spin_lock(&bucket->lhb_lock); - - if ( (lustre_hash_getitem_in_bucket_nolock(hash_body, hashent, key)) != NULL) { - /* the added-item exist in hashtables, so cannot add it again */ - spin_unlock(&bucket->lhb_lock); - - CWARN("Already found the key in hash [%s]\n", - hash_body->hashname); + + ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode); + if (ehnode != hnode) RETURN(-EALREADY); - } - - hlist_add_head(actual_hnode, &(bucket->lhb_head)); - -#ifdef LUSTRE_HASH_DEBUG - /* hash distribute debug */ - hash_body->lchb_hash_tables[hashent].lhb_item_count++; - CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", - hash_body->hashname, hashent, - hash_body->lchb_hash_tables[hashent].lhb_item_count); -#endif - hop->lustre_hash_object_refcount_get(actual_hnode); - - spin_unlock(&bucket->lhb_lock); - + RETURN(0); } -EXPORT_SYMBOL(lustre_hash_additem_unique); - -/* - * only allow unique @key in hashtables, if the same @key has existed - * in hashtables, it will return with fails. +EXPORT_SYMBOL(lustre_hash_add_unique); + +/** + * Add item @hnode to lustre hash @lh using @key. If this @key + * already exists in the hash then ops->lh_get will be called on the + * conflicting entry and that entry will be returned to the caller. + * Otherwise ops->lh_get is called on the item which was added. */ -void* lustre_hash_findadd_unique(struct lustre_class_hash_body *hash_body, - void *key, struct hlist_node *actual_hnode) +void * +lustre_hash_findadd_unique(lustre_hash_t *lh, void *key, + struct hlist_node *hnode) { - int hashent; - struct lustre_hash_bucket *bucket = NULL; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; - struct hlist_node * hash_item_hnode = NULL; - void *obj; + struct hlist_node *ehnode; + void *obj; ENTRY; - - LASSERT(hlist_unhashed(actual_hnode)); - hashent = hop->lustre_hashfn(hash_body, key); - - /* get the hash-bucket and lock it */ - bucket = &hash_body->lchb_hash_tables[hashent]; - spin_lock(&bucket->lhb_lock); - - hash_item_hnode = lustre_hash_getitem_in_bucket_nolock(hash_body, - hashent, key); - if ( hash_item_hnode != NULL) { - /* the added-item exist in hashtables, so cannot add it again */ - obj = hop->lustre_hash_object_refcount_get(hash_item_hnode); - spin_unlock(&bucket->lhb_lock); - RETURN(obj); - } - - hlist_add_head(actual_hnode, &(bucket->lhb_head)); - -#ifdef LUSTRE_HASH_DEBUG - /* hash distribute debug */ - hash_body->lchb_hash_tables[hashent].lhb_item_count++; - CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", - hash_body->hashname, hashent, - hash_body->lchb_hash_tables[hashent].lhb_item_count); -#endif - obj = hop->lustre_hash_object_refcount_get(actual_hnode); - - spin_unlock(&bucket->lhb_lock); - + + ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode); + obj = lh_get(lh, ehnode); + lh_put(lh, ehnode); RETURN(obj); } EXPORT_SYMBOL(lustre_hash_findadd_unique); - -/* - * this version of additem, it allow multi same @key <key, value> in hashtables. - * in this additem version, we don't need to check if exist same @key in hash - * tables, we only add it to related hashbucket. - * example: maybe same nid will be related to multi difference export + +/** + * Delete item @hnode from the lustre hash @lh using @key. The @key + * is required to ensure the correct hash bucket is locked since there + * is no direct linkage from the item to the bucket. The object + * removed from the hash will be returned and obs->lh_put is called + * on the removed object. */ -int lustre_hash_additem(struct lustre_class_hash_body *hash_body, void *key, - struct hlist_node *actual_hnode) +void * +lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode) { - int hashent; - struct lustre_hash_bucket *bucket = NULL; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; + lustre_hash_bucket_t *lhb; + int size; + unsigned i; + void *obj; ENTRY; - - LASSERT(hlist_unhashed(actual_hnode)); - - hashent = hop->lustre_hashfn(hash_body, key); - - /* get the hashbucket and lock it */ - bucket = &hash_body->lchb_hash_tables[hashent]; - spin_lock(&bucket->lhb_lock); - - hlist_add_head(actual_hnode, &(bucket->lhb_head)); - -#ifdef LUSTRE_HASH_DEBUG - /* hash distribute debug */ - hash_body->lchb_hash_tables[hashent].lhb_item_count++; - CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", - hash_body->hashname, hashent, - hash_body->lchb_hash_tables[hashent].lhb_item_count); -#endif - hop->lustre_hash_object_refcount_get(actual_hnode); - - spin_unlock(&bucket->lhb_lock); - - RETURN(0); + + __lustre_hash_key_validate(lh, key, hnode); + + read_lock(&lh->lh_rwlock); + i = lh_hash(lh, key, lh->lh_cur_size - 1); + lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + LASSERT(!hlist_unhashed(hnode)); + + write_lock(&lhb->lhb_rwlock); + obj = __lustre_hash_bucket_del(lh, lhb, hnode); + write_unlock(&lhb->lhb_rwlock); + + size = lustre_hash_rehash_size(lh); + read_unlock(&lh->lh_rwlock); + if (size) + lustre_hash_rehash(lh, size); + + RETURN(obj); } -EXPORT_SYMBOL(lustre_hash_additem); - - -/* - * this version of delitem will delete a hashitem with given @key, - * we need to search the <@key, @value> in hashbucket with @key, - * if match, the hashitem will be delete. - * we have a no-search version of delitem, it will directly delete a hashitem, - * doesn't need to search it in hashtables, so it is a O(1) delete. +EXPORT_SYMBOL(lustre_hash_del); + +/** + * Delete item given @key in lustre hash @lh. The first @key found in + * the hash will be removed, if the key exists multiple times in the hash + * @lh this function must be called once per key. The removed object + * will be returned and ops->lh_put is called on the removed object. */ -int lustre_hash_delitem_by_key(struct lustre_class_hash_body *hash_body, - void *key) +void * +lustre_hash_del_key(lustre_hash_t *lh, void *key) { - int hashent ; - struct hlist_node * hash_item; - struct lustre_hash_bucket *bucket = NULL; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; - int retval = 0; + struct hlist_node *hnode; + lustre_hash_bucket_t *lhb; + int size; + unsigned i; + void *obj = NULL; ENTRY; - - hashent = hop->lustre_hashfn(hash_body, key); - - /* first, lock the hashbucket */ - bucket = &hash_body->lchb_hash_tables[hashent]; - spin_lock(&bucket->lhb_lock); - - /* get the hash_item from hash_bucket */ - hash_item = lustre_hash_getitem_in_bucket_nolock(hash_body, hashent, - key); - - if (hash_item == NULL) { - spin_unlock(&bucket->lhb_lock); - RETURN(-ENOENT); - } - - /* call delitem_nolock() to delete the hash_item */ - retval = lustre_hash_delitem_nolock(hash_body, hashent, hash_item); - - spin_unlock(&bucket->lhb_lock); - - RETURN(retval); + + read_lock(&lh->lh_rwlock); + i = lh_hash(lh, key, lh->lh_cur_size - 1); + lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + + write_lock(&lhb->lhb_rwlock); + hnode = __lustre_hash_bucket_lookup(lh, lhb, key); + if (hnode) + obj = __lustre_hash_bucket_del(lh, lhb, hnode); + + write_unlock(&lhb->lhb_rwlock); + + size = lustre_hash_rehash_size(lh); + read_unlock(&lh->lh_rwlock); + if (size) + lustre_hash_rehash(lh, size); + + RETURN(obj); } -EXPORT_SYMBOL(lustre_hash_delitem_by_key); - -/* - * the O(1) version of delete hash item, - * it will directly delete the hashitem with given @hash_item, - * the parameter @key used to get the relation hash bucket and lock it. +EXPORT_SYMBOL(lustre_hash_del_key); + +/** + * Lookup an item using @key in the lustre hash @lh and return it. + * If the @key is found in the hash lh->lh_get() is called and the + * matching objects is returned. It is the callers responsibility + * to call the counterpart ops->lh_put using the lh_put() macro + * when when finished with the object. If the @key was not found + * in the hash @lh NULL is returned. */ -int lustre_hash_delitem(struct lustre_class_hash_body *hash_body, - void *key, struct hlist_node * hash_item) -{ - int hashent = 0; - int retval = 0; - struct lustre_hash_bucket *bucket = NULL; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; +void * +lustre_hash_lookup(lustre_hash_t *lh, void *key) +{ + struct hlist_node *hnode; + lustre_hash_bucket_t *lhb; + unsigned i; + void *obj = NULL; ENTRY; - - hashent = hop->lustre_hashfn(hash_body, key); - - bucket = &hash_body->lchb_hash_tables[hashent]; - spin_lock(&bucket->lhb_lock); - - /* call delitem_nolock() to delete the hash_item */ - retval = lustre_hash_delitem_nolock(hash_body, hashent, hash_item); - - spin_unlock(&bucket->lhb_lock); - - RETURN(retval); + + read_lock(&lh->lh_rwlock); + i = lh_hash(lh, key, lh->lh_cur_size - 1); + lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + + read_lock(&lhb->lhb_rwlock); + hnode = __lustre_hash_bucket_lookup(lh, lhb, key); + if (hnode) + obj = lh_get(lh, hnode); + + read_unlock(&lhb->lhb_rwlock); + read_unlock(&lh->lh_rwlock); + + RETURN(obj); } -EXPORT_SYMBOL(lustre_hash_delitem); - -void lustre_hash_bucket_iterate(struct lustre_class_hash_body *hash_body, - void *key, hash_item_iterate_cb func, void *data) +EXPORT_SYMBOL(lustre_hash_lookup); + +/** + * For each item in the lustre hash @lh call the passed callback @func + * and pass to it as an argument each hash item and the private @data. + * Before each callback ops->lh_get will be called, and after each + * callback ops->lh_put will be called. Finally, during the callback + * the bucket lock is held so the callback must never sleep. + */ +void +lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb func, void *data) { - int hashent, find = 0; - struct lustre_hash_bucket *bucket = NULL; - struct hlist_node *hash_item_node = NULL; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; - struct obd_export *tmp = NULL; - + struct hlist_node *hnode; + lustre_hash_bucket_t *lhb; + void *obj; + int i; ENTRY; - - hashent = hop->lustre_hashfn(hash_body, key); - bucket = &hash_body->lchb_hash_tables[hashent]; - - spin_lock(&bucket->lhb_lock); - hlist_for_each(hash_item_node, &(bucket->lhb_head)) { - find = hop->lustre_hash_key_compare(key, hash_item_node); - if (find) { - tmp = hop->lustre_hash_object_refcount_get(hash_item_node); - func(tmp, data); - hop->lustre_hash_object_refcount_put(hash_item_node); + + read_lock(&lh->lh_rwlock); + lh_for_each_bucket(lh, lhb, i) { + read_lock(&lhb->lhb_rwlock); + hlist_for_each(hnode, &(lhb->lhb_head)) { + __lustre_hash_bucket_validate(lh, lhb, hnode); + obj = lh_get(lh, hnode); + func(obj, data); + (void)lh_put(lh, hnode); } + read_unlock(&lhb->lhb_rwlock); } - spin_unlock(&bucket->lhb_lock); -} -EXPORT_SYMBOL(lustre_hash_bucket_iterate); + read_unlock(&lh->lh_rwlock); -void lustre_hash_iterate_all(struct lustre_class_hash_body *hash_body, - hash_item_iterate_cb func, void *data) + EXIT; +} +EXPORT_SYMBOL(lustre_hash_for_each); + +/** + * For each item in the lustre hash @lh call the passed callback @func + * and pass to it as an argument each hash item and the private @data. + * Before each callback ops->lh_get will be called, and after each + * callback ops->lh_put will be called. During the callback the + * bucket lock will not be held will allows for the current item + * to be removed from the hash during the callback. However, care + * should be taken to prevent other callers from operating on the + * hash concurrently or list corruption may occur. + */ +void +lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb func, void *data) { - int i; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; + struct hlist_node *hnode; + struct hlist_node *pos; + lustre_hash_bucket_t *lhb; + void *obj; + int i; ENTRY; - - for( i = 0; i < hash_body->lchb_hash_max_size; i++ ) { - struct lustre_hash_bucket * bucket; - struct hlist_node * actual_hnode, *pos; - void *obj; - - bucket = &hash_body->lchb_hash_tables[i]; -#ifdef LUSTRE_HASH_DEBUG - CDEBUG(D_INFO, "idx %d - bucket %p\n", i, bucket); -#endif - spin_lock(&bucket->lhb_lock); /* lock the bucket */ - hlist_for_each_safe(actual_hnode, pos, &(bucket->lhb_head)) { - obj = hop->lustre_hash_object_refcount_get(actual_hnode); + + read_lock(&lh->lh_rwlock); + lh_for_each_bucket(lh, lhb, i) { + read_lock(&lhb->lhb_rwlock); + hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) { + __lustre_hash_bucket_validate(lh, lhb, hnode); + obj = lh_get(lh, hnode); + read_unlock(&lhb->lhb_rwlock); func(obj, data); - hop->lustre_hash_object_refcount_put(actual_hnode); + read_lock(&lhb->lhb_rwlock); + (void)lh_put(lh, hnode); } - spin_unlock(&bucket->lhb_lock); + read_unlock(&lhb->lhb_rwlock); } + read_unlock(&lh->lh_rwlock); EXIT; } -EXPORT_SYMBOL(lustre_hash_iterate_all); - - -void * lustre_hash_get_object_by_key(struct lustre_class_hash_body *hash_body, - void *key) +EXPORT_SYMBOL(lustre_hash_for_each_safe); + +/** + * For each hash bucket in the lustre hash @lh call the passed callback + * @func until all the hash buckets are empty. The passed callback @func + * or the previously registered callback lh->lh_put must remove the item + * from the hash. You may either use the lustre_hash_del() or hlist_del() + * functions. No rwlocks will be held during the callback @func it is + * safe to sleep if needed. This function will not terminate until the + * hash is empty. Note it is still possible to concurrently add new + * items in to the hash. It is the callers responsibility to ensure + * the required locking is in place to prevent concurrent insertions. + */ +void +lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb func, void *data) { - int hashent ; - struct hlist_node * hash_item_hnode = NULL; - void * obj_value = NULL; - struct lustre_hash_bucket *bucket = NULL; - struct lustre_hash_operations * hop = hash_body->lchb_hash_operations; + struct hlist_node *hnode; + lustre_hash_bucket_t *lhb; + void *obj; + int i; ENTRY; - - /* get the hash value from the given item */ - hashent = hop->lustre_hashfn(hash_body, key); - - bucket = &hash_body->lchb_hash_tables[hashent]; - spin_lock(&bucket->lhb_lock); /* lock the bucket */ - - hash_item_hnode = lustre_hash_getitem_in_bucket_nolock(hash_body, - hashent, key); - - if (hash_item_hnode == NULL) { - spin_unlock(&bucket->lhb_lock); /* lock the bucket */ - RETURN(NULL); + +restart: + read_lock(&lh->lh_rwlock); + lh_for_each_bucket(lh, lhb, i) { + write_lock(&lhb->lhb_rwlock); + while (!hlist_empty(&lhb->lhb_head)) { + hnode = lhb->lhb_head.first; + __lustre_hash_bucket_validate(lh, lhb, hnode); + obj = lh_get(lh, hnode); + write_unlock(&lhb->lhb_rwlock); + read_unlock(&lh->lh_rwlock); + func(obj, data); + (void)lh_put(lh, hnode); + goto restart; + } + write_unlock(&lhb->lhb_rwlock); } - - obj_value = hop->lustre_hash_object_refcount_get(hash_item_hnode); - spin_unlock(&bucket->lhb_lock); /* lock the bucket */ - - RETURN(obj_value); -} -EXPORT_SYMBOL(lustre_hash_get_object_by_key); - -/* string hashing using djb2 hash algorithm */ -__u32 djb2_hashfn(struct lustre_class_hash_body *hash_body, void* key, - size_t size) -{ - __u32 hash = 5381; - int i; - char *ptr = key; - - LASSERT(key != NULL); - - for (i=0; i<size; i++) - hash = hash * 33 + ptr[i]; - - hash &= (hash_body->lchb_hash_max_size - 1); - - RETURN(hash); -} - -/* - * define (uuid <-> export) hash operations and function define - */ - -/* define the uuid hash operations */ -struct lustre_hash_operations uuid_hash_operations = { - .lustre_hashfn = uuid_hashfn, - .lustre_hash_key_compare = uuid_hash_key_compare, - .lustre_hash_object_refcount_get = uuid_export_refcount_get, - .lustre_hash_object_refcount_put = uuid_export_refcount_put, -}; - -__u32 uuid_hashfn(struct lustre_class_hash_body *hash_body, void * key) -{ - struct obd_uuid * uuid_key = key; - - return djb2_hashfn(hash_body, uuid_key->uuid, sizeof(uuid_key->uuid)); -} - -/* Note, it is impossible to find an export that is in failed state with - * this function */ -int uuid_hash_key_compare(void *key, struct hlist_node *compared_hnode) -{ - struct obd_export *export = NULL; - struct obd_uuid *uuid_key = NULL, *compared_uuid = NULL; - - LASSERT( key != NULL); - - uuid_key = (struct obd_uuid*)key; - - export = hlist_entry(compared_hnode, struct obd_export, exp_uuid_hash); - - compared_uuid = &export->exp_client_uuid; - - RETURN(obd_uuid_equals(uuid_key, compared_uuid) && - !export->exp_failed); -} - -void * uuid_export_refcount_get(struct hlist_node * actual_hnode) -{ - struct obd_export *export = NULL; - - LASSERT(actual_hnode != NULL); - - export = hlist_entry(actual_hnode, struct obd_export, exp_uuid_hash); - - LASSERT(export != NULL); - - class_export_get(export); - - RETURN(export); + read_unlock(&lh->lh_rwlock); + EXIT; } - -void uuid_export_refcount_put(struct hlist_node * actual_hnode) +EXPORT_SYMBOL(lustre_hash_for_each_empty); + + /* + * For each item in the lustre hash @lh which matches the @key call + * the passed callback @func and pass to it as an argument each hash + * item and the private @data. Before each callback ops->lh_get will + * be called, and after each callback ops->lh_put will be called. + * Finally, during the callback the bucket lock is held so the + * callback must never sleep. + */ +void +lustre_hash_for_each_key(lustre_hash_t *lh, void *key, + lh_for_each_cb func, void *data) { - struct obd_export *export = NULL; - - LASSERT(actual_hnode != NULL); - - export = hlist_entry(actual_hnode, struct obd_export, exp_uuid_hash); - - LASSERT(export != NULL); - - class_export_put(export); + struct hlist_node *hnode; + lustre_hash_bucket_t *lhb; + unsigned i; + ENTRY; + + read_lock(&lh->lh_rwlock); + i = lh_hash(lh, key, lh->lh_cur_size - 1); + lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + + read_lock(&lhb->lhb_rwlock); + hlist_for_each(hnode, &(lhb->lhb_head)) { + __lustre_hash_bucket_validate(lh, lhb, hnode); + + if (!lh_compare(lh, key, hnode)) + continue; + + func(lh_get(lh, hnode), data); + (void)lh_put(lh, hnode); + } + + read_unlock(&lhb->lhb_rwlock); + read_unlock(&lh->lh_rwlock); + + EXIT; } - -/* - * define (nid <-> export) hash operations and function define +EXPORT_SYMBOL(lustre_hash_for_each_key); + +/** + * Rehash the lustre hash @lh to the given @size. This can be used + * to grow the hash size when excessive chaining is detected, or to + * shrink the hash when it is larger than needed. When the LH_REHASH + * flag is set in @lh the lustre hash may be dynamically rehashed + * during addition or removal if the hash's theta value exceeds + * either the lh->lh_min_theta or lh->max_theta values. By default + * these values are tuned to keep the chained hash depth small, and + * this approach assumes a reasonably uniform hashing function. The + * theta thresholds for @lh are tunable via lustre_hash_set_theta(). */ - -/* define the nid hash operations */ -struct lustre_hash_operations nid_hash_operations = { - .lustre_hashfn = nid_hashfn, - .lustre_hash_key_compare = nid_hash_key_compare, - .lustre_hash_object_refcount_get = nid_export_refcount_get, - .lustre_hash_object_refcount_put = nid_export_refcount_put, -}; - -__u32 nid_hashfn(struct lustre_class_hash_body *hash_body, void * key) -{ - return djb2_hashfn(hash_body, key, sizeof(lnet_nid_t)); -} - -/* Note, it is impossible to find an export that is in failed state with - * this function */ -int nid_hash_key_compare(void *key, struct hlist_node *compared_hnode) +int +lustre_hash_rehash(lustre_hash_t *lh, int size) { - struct obd_export *export = NULL; - lnet_nid_t *nid_key = NULL; - - LASSERT( key != NULL); - - nid_key = (lnet_nid_t*)key; - - export = hlist_entry(compared_hnode, struct obd_export, exp_nid_hash); - - return (export->exp_connection->c_peer.nid == *nid_key && - !export->exp_failed); -} - -void *nid_export_refcount_get(struct hlist_node *actual_hnode) -{ - struct obd_export *export = NULL; - - LASSERT(actual_hnode != NULL); - - export = hlist_entry(actual_hnode, struct obd_export, exp_nid_hash); - - LASSERT(export != NULL); - - class_export_get(export); - - RETURN(export); -} - -void nid_export_refcount_put(struct hlist_node *actual_hnode) -{ - struct obd_export *export = NULL; - - LASSERT(actual_hnode != NULL); - - export = hlist_entry(actual_hnode, struct obd_export, exp_nid_hash); - - LASSERT(export != NULL); - - class_export_put(export); + struct hlist_node *hnode; + struct hlist_node *pos; + lustre_hash_bucket_t *lh_buckets; + lustre_hash_bucket_t *rehash_buckets; + lustre_hash_bucket_t *lh_lhb; + lustre_hash_bucket_t *rehash_lhb; + int i; + int lh_size; + int theta; + void *key; + ENTRY; + + LASSERT(size > 0); + + OBD_VMALLOC(rehash_buckets, sizeof(*rehash_buckets) * size); + if (!rehash_buckets) + RETURN(-ENOMEM); + + for (i = 0; i < size; i++) { + INIT_HLIST_HEAD(&rehash_buckets[i].lhb_head); + rwlock_init(&rehash_buckets[i].lhb_rwlock); + atomic_set(&rehash_buckets[i].lhb_count, 0); + } + + write_lock(&lh->lh_rwlock); + + /* + * Early return for multiple concurrent racing callers, + * ensure we only trigger the rehash if it is still needed. + */ + theta = __lustre_hash_theta(lh); + if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) { + OBD_VFREE(rehash_buckets, sizeof(*rehash_buckets) * size); + write_unlock(&lh->lh_rwlock); + RETURN(-EALREADY); + } + + lh_size = lh->lh_cur_size; + lh_buckets = lh->lh_buckets; + + lh->lh_cur_size = size; + lh->lh_buckets = rehash_buckets; + atomic_inc(&lh->lh_rehash_count); + + for (i = 0; i < lh_size; i++) { + lh_lhb = &lh_buckets[i]; + + write_lock(&lh_lhb->lhb_rwlock); + hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) { + key = lh_key(lh, hnode); + LASSERT(key); + + /* + * Validate hnode is in the correct bucket. + */ + if (unlikely(lh->lh_flags & LH_DEBUG)) + LASSERT(lh_hash(lh, key, lh_size - 1) == i); + + /* + * Delete from old hash bucket. + */ + hlist_del(hnode); + LASSERT(atomic_read(&lh_lhb->lhb_count) > 0); + atomic_dec(&lh_lhb->lhb_count); + + /* + * Add to rehash bucket, ops->lh_key must be defined. + */ + rehash_lhb = &rehash_buckets[lh_hash(lh, key, size-1)]; + hlist_add_head(hnode, &(rehash_lhb->lhb_head)); + atomic_inc(&rehash_lhb->lhb_count); + } + + LASSERT(hlist_empty(&(lh_lhb->lhb_head))); + LASSERT(atomic_read(&lh_lhb->lhb_count) == 0); + write_unlock(&lh_lhb->lhb_rwlock); + } + + OBD_VFREE(lh_buckets, sizeof(*lh_buckets) * lh_size); + write_unlock(&lh->lh_rwlock); + + RETURN(0); } - -/* - * define (net_peer <-> connection) hash operations and function define +EXPORT_SYMBOL(lustre_hash_rehash); + +/** + * Rehash the object referenced by @hnode in the lustre hash @lh. The + * @old_key must be provided to locate the objects previous location + * in the hash, and the @new_key will be used to reinsert the object. + * Use this function instead of a lustre_hash_add() + lustre_hash_del() + * combo when it is critical that there is no window in time where the + * object is missing from the hash. When an object is being rehashed + * the registered lh_get() and lh_put() functions will not be called. */ - -/* define the conn hash operations */ -struct lustre_hash_operations conn_hash_operations = { - .lustre_hashfn = conn_hashfn, - .lustre_hash_key_compare = conn_hash_key_compare, - .lustre_hash_object_refcount_get = conn_refcount_get, - .lustre_hash_object_refcount_put = conn_refcount_put, -}; -EXPORT_SYMBOL(conn_hash_operations); - -__u32 conn_hashfn(struct lustre_class_hash_body *hash_body, void * key) -{ - return djb2_hashfn(hash_body, key, sizeof(lnet_process_id_t)); -} - -int conn_hash_key_compare(void *key, struct hlist_node *compared_hnode) +void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, void *new_key, + struct hlist_node *hnode) { - struct ptlrpc_connection *c = NULL; - lnet_process_id_t *conn_key = NULL; - - LASSERT( key != NULL); - - conn_key = (lnet_process_id_t*)key; - - c = hlist_entry(compared_hnode, struct ptlrpc_connection, c_hash); - - return (conn_key->nid == c->c_peer.nid && - conn_key->pid == c->c_peer.pid); -} - -void *conn_refcount_get(struct hlist_node *actual_hnode) -{ - struct ptlrpc_connection *c = NULL; - - LASSERT(actual_hnode != NULL); - - c = hlist_entry(actual_hnode, struct ptlrpc_connection, c_hash); - - LASSERT(c != NULL); - - atomic_inc(&c->c_refcount); - - RETURN(c); -} - -void conn_refcount_put(struct hlist_node *actual_hnode) -{ - struct ptlrpc_connection *c = NULL; - - LASSERT(actual_hnode != NULL); - - c = hlist_entry(actual_hnode, struct ptlrpc_connection, c_hash); - - LASSERT(c != NULL); - - atomic_dec(&c->c_refcount); -} - -/*******************************************************************************/ -/* ( nid<>nidstats ) hash operations define */ - -struct lustre_hash_operations nid_stat_hash_operations = { - .lustre_hashfn = nid_hashfn, - .lustre_hash_key_compare = nidstats_hash_key_compare, - .lustre_hash_object_refcount_get = nidstats_refcount_get, - .lustre_hash_object_refcount_put = nidstats_refcount_put, -}; -EXPORT_SYMBOL(nid_stat_hash_operations); - -int nidstats_hash_key_compare(void *key, struct hlist_node * compared_hnode) -{ - struct nid_stat *data; - lnet_nid_t *nid_key; - - LASSERT( key != NULL); - - nid_key = (lnet_nid_t*)key; - data = hlist_entry(compared_hnode, struct nid_stat, nid_hash); - - return (data->nid == *nid_key); -} - -void* nidstats_refcount_get(struct hlist_node * actual_hnode) -{ - struct nid_stat *data; - - data = hlist_entry(actual_hnode, struct nid_stat, nid_hash); - data->nid_exp_ref_count++; - - RETURN(data); -} - -void nidstats_refcount_put(struct hlist_node * actual_hnode) -{ - struct nid_stat *data; - - data = hlist_entry(actual_hnode, struct nid_stat, nid_hash); - data->nid_exp_ref_count--; + lustre_hash_bucket_t *old_lhb; + lustre_hash_bucket_t *new_lhb; + unsigned i; + int j; + ENTRY; + + __lustre_hash_key_validate(lh, new_key, hnode); + LASSERT(!hlist_unhashed(hnode)); + + read_lock(&lh->lh_rwlock); + + i = lh_hash(lh, old_key, lh->lh_cur_size - 1); + old_lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + + j = lh_hash(lh, new_key, lh->lh_cur_size - 1); + new_lhb = &lh->lh_buckets[j]; + LASSERT(j < lh->lh_cur_size); + + write_lock(&old_lhb->lhb_rwlock); + write_lock(&new_lhb->lhb_rwlock); + + /* + * Migrate item between hash buckets without calling + * the lh_get() and lh_put() callback functions. + */ + hlist_del(hnode); + LASSERT(atomic_read(&old_lhb->lhb_count) > 0); + atomic_dec(&old_lhb->lhb_count); + hlist_add_head(hnode, &(new_lhb->lhb_head)); + atomic_inc(&new_lhb->lhb_count); + + write_unlock(&new_lhb->lhb_rwlock); + write_unlock(&old_lhb->lhb_rwlock); + read_unlock(&lh->lh_rwlock); + EXIT; } - -/*******************************************************************************/ - -#if defined(__KERNEL__) && defined(HAVE_QUOTA_SUPPORT) -/* - * define ( lqs <-> qctxt ) hash operations and function define - */ - -/* define the conn hash operations */ -struct lustre_hash_operations lqs_hash_operations = { - .lustre_hashfn = lqs_hashfn, - .lustre_hash_key_compare = lqs_hash_key_compare, - .lustre_hash_object_refcount_get = lqs_refcount_get, - .lustre_hash_object_refcount_put = lqs_refcount_put, -}; -EXPORT_SYMBOL(lqs_hash_operations); - -/* string hashing using djb2 hash algorithm */ -__u32 lqs_hashfn(struct lustre_class_hash_body *hash_body, void * key) -{ - struct quota_adjust_qunit *lqs_key = NULL; - __u32 hash; - - LASSERT(key != NULL); - - lqs_key = (struct quota_adjust_qunit *)key; - - hash = QAQ_IS_GRP(lqs_key) ? 5381 : 5387; - hash *= lqs_key->qaq_id; - - hash &= (hash_body->lchb_hash_max_size - 1); - - RETURN(hash); -} - -int lqs_hash_key_compare(void *key, struct hlist_node *compared_hnode) -{ - struct quota_adjust_qunit *lqs_key = NULL; - struct lustre_qunit_size *q = NULL; - int retval = 0; - - LASSERT( key != NULL); - - lqs_key = (struct quota_adjust_qunit *)key; - - q = hlist_entry(compared_hnode, struct lustre_qunit_size, lqs_hash); - - spin_lock(&q->lqs_lock); - if (lqs_key->qaq_id == q->lqs_id && QAQ_IS_GRP(lqs_key) == LQS_IS_GRP(q)) - retval = 1; - spin_unlock(&q->lqs_lock); - - return retval; -} - -void * lqs_refcount_get(struct hlist_node * actual_hnode) +EXPORT_SYMBOL(lustre_hash_rehash_key); + +int lustre_hash_debug_header(char *str, int size) { - struct lustre_qunit_size *q = NULL; - - LASSERT(actual_hnode != NULL); - - q = hlist_entry(actual_hnode, struct lustre_qunit_size, lqs_hash); - - LASSERT(q != NULL); - - lqs_getref(q); - - RETURN(q); + return snprintf(str, size, + "%-36s%6s%6s%6s%6s%6s%6s%6s%7s%6s%s\n", + "name", "cur", "min", "max", "theta", "t-min", "t-max", + "flags", "rehash", "count", " distribution"); } +EXPORT_SYMBOL(lustre_hash_debug_header); -void lqs_refcount_put(struct hlist_node * actual_hnode) +int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size) { - struct lustre_qunit_size *q = NULL; - - LASSERT(actual_hnode != NULL); - - q = hlist_entry(actual_hnode, struct lustre_qunit_size, lqs_hash); - - LASSERT(q != NULL); - - lqs_putref(q); + lustre_hash_bucket_t *lhb; + int theta; + int i; + int c = 0; + int dist[8] = { 0, }; + + if (str == NULL || size == 0) + return 0; + + read_lock(&lh->lh_rwlock); + theta = __lustre_hash_theta(lh); + + c += snprintf(str + c, size - c, "%-36s ",lh->lh_name); + c += snprintf(str + c, size - c, "%5d ", lh->lh_cur_size); + c += snprintf(str + c, size - c, "%5d ", lh->lh_min_size); + c += snprintf(str + c, size - c, "%5d ", lh->lh_max_size); + c += snprintf(str + c, size - c, "%d.%03d ", + theta / 1000, theta % 1000); + c += snprintf(str + c, size - c, "%d.%03d ", + lh->lh_min_theta / 1000, lh->lh_min_theta % 1000); + c += snprintf(str + c, size - c, "%d.%03d ", + lh->lh_max_theta / 1000, lh->lh_max_theta % 1000); + c += snprintf(str + c, size - c, " 0x%02x ", lh->lh_flags); + c += snprintf(str + c, size - c, "%6d ", + atomic_read(&lh->lh_rehash_count)); + c += snprintf(str + c, size - c, "%5d ", + atomic_read(&lh->lh_count)); + + /* + * The distribution is a summary of the chained hash depth in + * each of the lustre hash buckets. Each buckets lhb_count is + * divided by the hash theta value and used to generate a + * histogram of the hash distribution. A uniform hash will + * result in all hash buckets being close to the average thus + * only the first few entries in the histogram will be non-zero. + * If you hash function results in a non-uniform hash the will + * be observable by outlier bucks in the distribution histogram. + * + * Uniform hash distribution: 128/128/0/0/0/0/0/0 + * Non-Uniform hash distribution: 128/125/0/0/0/0/2/1 + */ + lh_for_each_bucket(lh, lhb, i) + dist[MIN(__fls(atomic_read(&lhb->lhb_count)/MAX(theta,1)),7)]++; + + for (i = 0; i < 8; i++) + c += snprintf(str + c, size - c, "%d%c", dist[i], + (i == 7) ? '\n' : '/'); + + read_unlock(&lh->lh_rwlock); + + return c; } -#endif +EXPORT_SYMBOL(lustre_hash_debug_str); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 14cd8fd9c518aacbbfc5c8a319b6b55bcb0c8194..76d56762c3a2b36c3efbc15790239e8116274e2d 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -697,15 +697,12 @@ struct obd_export *class_new_export(struct obd_device *obd, return ERR_PTR(-ENOMEM); export->exp_conn_cnt = 0; + export->exp_lock_hash = NULL; atomic_set(&export->exp_refcount, 2); atomic_set(&export->exp_rpc_count, 0); export->exp_obd = obd; CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies); CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue); - /* XXX this should be in LDLM init */ - CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks); - spin_lock_init(&export->exp_ldlm_data.led_lock); - CFS_INIT_LIST_HEAD(&export->exp_handle.h_link); class_handle_hash(&export->exp_handle, export_handle_addref); export->exp_last_request_time = cfs_time_current_sec(); @@ -717,15 +714,15 @@ struct obd_export *class_new_export(struct obd_device *obd, obd_init_export(export); if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) { - rc = lustre_hash_additem_unique(obd->obd_uuid_hash_body, cluuid, - &export->exp_uuid_hash); - if (rc != 0) { - CWARN("%s: denying duplicate export for %s\n", - obd->obd_name, cluuid->uuid); - class_handle_unhash(&export->exp_handle); - OBD_FREE_PTR(export); - return ERR_PTR(-EALREADY); - } + rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid, + &export->exp_uuid_hash); + if (rc != 0) { + LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n", + obd->obd_name, cluuid->uuid, rc); + class_handle_unhash(&export->exp_handle); + OBD_FREE_PTR(export); + return ERR_PTR(-EALREADY); + } } spin_lock(&obd->obd_dev_lock); @@ -747,10 +744,11 @@ void class_unlink_export(struct obd_export *exp) spin_lock(&exp->exp_obd->obd_dev_lock); /* delete an uuid-export hashitem from hashtables */ - if (!hlist_unhashed(&exp->exp_uuid_hash)) { - lustre_hash_delitem(exp->exp_obd->obd_uuid_hash_body, - &exp->exp_client_uuid, &exp->exp_uuid_hash); - } + if (!hlist_unhashed(&exp->exp_uuid_hash)) + lustre_hash_del(exp->exp_obd->obd_uuid_hash, + &exp->exp_client_uuid, + &exp->exp_uuid_hash); + list_del_init(&exp->exp_obd_chain); list_del_init(&exp->exp_obd_chain_timed); exp->exp_obd->obd_num_exports--; @@ -941,10 +939,11 @@ int class_disconnect(struct obd_export *export) already_disconnected = export->exp_disconnected; export->exp_disconnected = 1; - if (!hlist_unhashed(&export->exp_nid_hash)) { - lustre_hash_delitem(export->exp_obd->obd_nid_hash_body, - &export->exp_connection->c_peer.nid, &export->exp_nid_hash); - } + if (!hlist_unhashed(&export->exp_nid_hash)) + lustre_hash_del(export->exp_obd->obd_nid_hash, + &export->exp_connection->c_peer.nid, + &export->exp_nid_hash); + spin_unlock(&export->exp_lock); /* class_cleanup(), abort_recovery(), and class_fail_export() @@ -1255,8 +1254,7 @@ int obd_export_evict_by_nid(struct obd_device *obd, char *nid) lnet_nid_t nid_key = libcfs_str2nid(nid); do { - doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body, - &nid_key); + doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key); if (doomed_exp == NULL) break; @@ -1284,17 +1282,16 @@ EXPORT_SYMBOL(obd_export_evict_by_nid); int obd_export_evict_by_uuid(struct obd_device *obd, char *uuid) { struct obd_export *doomed_exp = NULL; - struct obd_uuid doomed; + struct obd_uuid doomed_uuid; int exports_evicted = 0; - obd_str2uuid(&doomed, uuid); - if(obd_uuid_equals(&doomed, &obd->obd_uuid)) { + obd_str2uuid(&doomed_uuid, uuid); + if(obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) { CERROR("%s: can't evict myself\n", obd->obd_name); return exports_evicted; } - doomed_exp = lustre_hash_get_object_by_key(obd->obd_uuid_hash_body, - &doomed); + doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid); if (doomed_exp == NULL) { CERROR("%s: can't disconnect %s: no exports found\n", diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 70bb76c63aa057196c861a97c0b6eba05de12702..f56b1a381c6336fee2cdd3056143543e1e76ce3e 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -1267,6 +1267,16 @@ struct exp_uuid_cb_data { int *len; }; +static void +lprocfs_exp_rd_cb_data_init(struct exp_uuid_cb_data *cb_data, char *page, + int count, int *eof, int *len) +{ + cb_data->page = page; + cb_data->count = count; + cb_data->eof = eof; + cb_data->len = len; +} + void lprocfs_exp_print_uuid(void *obj, void *cb_data) { struct obd_export *exp = (struct obd_export *)obj; @@ -1288,15 +1298,43 @@ int lprocfs_exp_rd_uuid(char *page, char **start, off_t off, int count, *eof = 1; page[0] = '\0'; - LASSERT(obd != NULL); + lprocfs_exp_rd_cb_data_init(&cb_data, page, count, eof, &len); + lustre_hash_for_each_key(obd->obd_nid_hash, &stats->nid, + lprocfs_exp_print_uuid, &cb_data); + return (*cb_data.len); +} + +void lprocfs_exp_print_hash(void *obj, void *cb_data) +{ + struct obd_export *exp = (struct obd_export *)obj; + struct exp_uuid_cb_data *data = (struct exp_uuid_cb_data *)cb_data; + lustre_hash_t *lh; + + lh = exp->exp_lock_hash; + if (lh) { + if (!*data->len) + *data->len += lustre_hash_debug_header(data->page, + data->count); + + *data->len += lustre_hash_debug_str(lh, data->page + + *data->len, + data->count); + } +} + +int lprocfs_exp_rd_hash(char *page, char **start, off_t off, int count, + int *eof, void *data) +{ + struct nid_stat *stats = (struct nid_stat *)data; + struct exp_uuid_cb_data cb_data; + struct obd_device *obd = stats->nid_obd; + int len = 0; - cb_data.page = page; - cb_data.count = count; - cb_data.eof = eof; - cb_data.len = &len; - lustre_hash_bucket_iterate(obd->obd_nid_hash_body, - &stats->nid, lprocfs_exp_print_uuid, - &cb_data); + *eof = 1; + page[0] = '\0'; + lprocfs_exp_rd_cb_data_init(&cb_data, page, count, eof, &len); + lustre_hash_for_each_key(obd->obd_nid_hash, &stats->nid, + lprocfs_exp_print_hash, &cb_data); return (*cb_data.len); } @@ -1340,7 +1378,6 @@ void lprocfs_nid_stats_clear_write_cb(void *obj, void *data) return; } - int lprocfs_nid_stats_clear_write(struct file *file, const char *buffer, unsigned long count, void *data) { @@ -1348,8 +1385,8 @@ int lprocfs_nid_stats_clear_write(struct file *file, const char *buffer, struct nid_stat *client_stat; CFS_LIST_HEAD(free_list); - lustre_hash_iterate_all(obd->obd_nid_stats_hash_body, - lprocfs_nid_stats_clear_write_cb, &free_list); + lustre_hash_for_each(obd->obd_nid_stats_hash, + lprocfs_nid_stats_clear_write_cb, &free_list); while (!list_empty(&free_list)) { client_stat = list_entry(free_list.next, struct nid_stat, nid_list); @@ -1372,7 +1409,7 @@ int lprocfs_exp_setup(struct obd_export *exp, lnet_nid_t *nid, int *newnid) *newnid = 0; if (!exp || !exp->exp_obd || !exp->exp_obd->obd_proc_exports_entry || - !exp->exp_obd->obd_nid_stats_hash_body) + !exp->exp_obd->obd_nid_stats_hash) RETURN(-EINVAL); /* not test against zero because eric say: @@ -1383,7 +1420,7 @@ int lprocfs_exp_setup(struct obd_export *exp, lnet_nid_t *nid, int *newnid) obd = exp->exp_obd; - CDEBUG(D_CONFIG, "using hash %p\n", obd->obd_nid_stats_hash_body); + CDEBUG(D_CONFIG, "using hash %p\n", obd->obd_nid_stats_hash); OBD_ALLOC(tmp, sizeof(struct nid_stat)); if (tmp == NULL) @@ -1408,8 +1445,8 @@ int lprocfs_exp_setup(struct obd_export *exp, lnet_nid_t *nid, int *newnid) list_add(&tmp->nid_list, &obd->obd_nid_stats); spin_unlock(&obd->obd_nid_lock); - tmp1 = lustre_hash_findadd_unique(obd->obd_nid_stats_hash_body, nid, - &tmp->nid_hash); + tmp1 = lustre_hash_findadd_unique(obd->obd_nid_stats_hash, + nid, &tmp->nid_hash); CDEBUG(D_INFO, "Found stats %p for nid %s - ref %d\n", tmp1, libcfs_nid2str(*nid), tmp->nid_exp_ref_count); @@ -1447,8 +1484,7 @@ int lprocfs_exp_setup(struct obd_export *exp, lnet_nid_t *nid, int *newnid) if (!tmp->nid_proc) { CERROR("Error making export directory for" " nid %s\n", libcfs_nid2str(*nid)); - lustre_hash_delitem(obd->obd_nid_stats_hash_body, nid, - &tmp->nid_hash); + lustre_hash_del(obd->obd_nid_stats_hash, nid, &tmp->nid_hash); GOTO(destroy_new, rc = -ENOMEM); } @@ -1462,6 +1498,11 @@ int lprocfs_exp_setup(struct obd_export *exp, lnet_nid_t *nid, int *newnid) if (rc) CWARN("Error adding the uuid file\n"); + rc = lprocfs_add_simple(tmp->nid_proc, "hash", + lprocfs_exp_rd_hash, NULL, tmp); + if (rc) + CWARN("Error adding the hash file\n"); + exp->exp_nid_stats = tmp; *newnid = 1; RETURN(rc); @@ -1842,6 +1883,24 @@ out: } EXPORT_SYMBOL(lprocfs_obd_rd_recovery_status); +int lprocfs_obd_rd_hash(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = data; + int c = 0; + + if (obd == NULL) + return 0; + + c += lustre_hash_debug_header(page, count); + c += lustre_hash_debug_str(obd->obd_uuid_hash, page + c, count - c); + c += lustre_hash_debug_str(obd->obd_nid_hash, page + c, count - c); + c += lustre_hash_debug_str(obd->obd_nid_stats_hash, page+c, count-c); + + return c; +} +EXPORT_SYMBOL(lprocfs_obd_rd_hash); + #ifdef CRAY_XT3 int lprocfs_obd_rd_recovery_maxtime(char *page, char **start, off_t off, int count, int *eof, void *data) diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index 565b0f15d0d0dcc0480ae6a0d533a24fb8db637f..8799717666fc9012f6d3bd94e15e924db63784a2 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -53,8 +53,9 @@ #include <lustre_param.h> #include <class_hash.h> -extern struct lustre_hash_operations uuid_hash_operations; -extern struct lustre_hash_operations nid_hash_operations; +static lustre_hash_ops_t uuid_hash_ops; +static lustre_hash_ops_t nid_hash_ops; +static lustre_hash_ops_t nid_stat_hash_ops; /*********** string parsing utils *********/ @@ -296,26 +297,28 @@ int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg) /* just leave this on forever. I can't use obd_set_up here because other fns check that status, and we're not actually set up yet. */ obd->obd_starting = 1; + obd->obd_uuid_hash = NULL; + obd->obd_nid_hash = NULL; + obd->obd_nid_stats_hash = NULL; spin_unlock(&obd->obd_dev_lock); - /* create an uuid-export hash body */ - err = lustre_hash_init(&obd->obd_uuid_hash_body, "UUID_HASH", - 128, &uuid_hash_operations); - if (err) - GOTO(err_hash, err); - - /* create a nid-export hash body */ - err = lustre_hash_init(&obd->obd_nid_hash_body, "NID_HASH", - 128, &nid_hash_operations); - if (err) - GOTO(err_hash, err); + /* create an uuid-export lustre hash */ + obd->obd_uuid_hash = lustre_hash_init("UUID_HASH", 128, 128, + &uuid_hash_ops, 0); + if (!obd->obd_uuid_hash) + GOTO(err_hash, -ENOMEM); - /* create a nid-stats hash body */ - err = lustre_hash_init(&obd->obd_nid_stats_hash_body, "NID_STATS", - 128, &nid_stat_hash_operations); - if (err) - GOTO(err_hash, err); + /* create a nid-export lustre hash */ + obd->obd_nid_hash = lustre_hash_init("NID_HASH", 128, 128, + &nid_hash_ops, 0); + if (!obd->obd_nid_hash) + GOTO(err_hash, -ENOMEM); + /* create a nid-stats lustre hash */ + obd->obd_nid_stats_hash = lustre_hash_init("NID_STATS", 128, 128, + &nid_stat_hash_ops, 0); + if (!obd->obd_nid_stats_hash) + GOTO(err_hash, -ENOMEM); exp = class_new_export(obd, &obd->obd_uuid); if (IS_ERR(exp)) @@ -343,9 +346,9 @@ err_exp: class_unlink_export(obd->obd_self_export); obd->obd_self_export = NULL; err_hash: - lustre_hash_exit(&obd->obd_uuid_hash_body); - lustre_hash_exit(&obd->obd_nid_hash_body); - lustre_hash_exit(&obd->obd_nid_stats_hash_body); + lustre_hash_exit(obd->obd_uuid_hash); + lustre_hash_exit(obd->obd_nid_hash); + lustre_hash_exit(obd->obd_nid_stats_hash); obd->obd_starting = 0; CERROR("setup %s failed (%d)\n", obd->obd_name, err); RETURN(err); @@ -479,13 +482,13 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg) LASSERT(obd->obd_self_export); /* destroy an uuid-export hash body */ - lustre_hash_exit(&obd->obd_uuid_hash_body); + lustre_hash_exit(obd->obd_uuid_hash); /* destroy a nid-export hash body */ - lustre_hash_exit(&obd->obd_nid_hash_body); + lustre_hash_exit(obd->obd_nid_hash); /* destroy a nid-stats hash body */ - lustre_hash_exit(&obd->obd_nid_stats_hash_body); + lustre_hash_exit(obd->obd_nid_stats_hash); /* Precleanup stage 1, we must make sure all exports (other than the self-export) get destroyed. */ @@ -1260,3 +1263,188 @@ out: lustre_cfg_free(lcfg); RETURN(rc); } + +/* + * uuid<->export lustre hash operations + */ + +static unsigned +uuid_hash(lustre_hash_t *lh, void *key, unsigned mask) +{ + return lh_djb2_hash(((struct obd_uuid *)key)->uuid, + sizeof(((struct obd_uuid *)key)->uuid), mask); +} + +static void * +uuid_key(struct hlist_node *hnode) +{ + struct obd_export *exp; + + exp = hlist_entry(hnode, struct obd_export, exp_uuid_hash); + + RETURN(&exp->exp_client_uuid); +} + +/* + * NOTE: It is impossible to find an export that is in failed + * state with this function + */ +static int +uuid_compare(void *key, struct hlist_node *hnode) +{ + struct obd_export *exp; + + LASSERT(key); + exp = hlist_entry(hnode, struct obd_export, exp_uuid_hash); + + RETURN(obd_uuid_equals((struct obd_uuid *)key,&exp->exp_client_uuid) && + !exp->exp_failed); +} + +static void * +uuid_export_get(struct hlist_node *hnode) +{ + struct obd_export *exp; + + exp = hlist_entry(hnode, struct obd_export, exp_uuid_hash); + class_export_get(exp); + + RETURN(exp); +} + +static void * +uuid_export_put(struct hlist_node *hnode) +{ + struct obd_export *exp; + + exp = hlist_entry(hnode, struct obd_export, exp_uuid_hash); + class_export_put(exp); + + RETURN(exp); +} + +static lustre_hash_ops_t uuid_hash_ops = { + .lh_hash = uuid_hash, + .lh_key = uuid_key, + .lh_compare = uuid_compare, + .lh_get = uuid_export_get, + .lh_put = uuid_export_put, +}; + + +/* + * nid<->export hash operations + */ + +static unsigned +nid_hash(lustre_hash_t *lh, void *key, unsigned mask) +{ + return lh_djb2_hash(key, sizeof(lnet_nid_t), mask); +} + +static void * +nid_key(struct hlist_node *hnode) +{ + struct obd_export *exp; + + exp = hlist_entry(hnode, struct obd_export, exp_nid_hash); + + RETURN(&exp->exp_connection->c_peer.nid); +} + +/* + * NOTE: It is impossible to find an export that is in failed + * state with this function + */ +static int +nid_compare(void *key, struct hlist_node *hnode) +{ + struct obd_export *exp; + + LASSERT(key); + exp = hlist_entry(hnode, struct obd_export, exp_nid_hash); + + RETURN(exp->exp_connection->c_peer.nid == *(lnet_nid_t *)key && + !exp->exp_failed); +} + +static void * +nid_export_get(struct hlist_node *hnode) +{ + struct obd_export *exp; + + exp = hlist_entry(hnode, struct obd_export, exp_nid_hash); + class_export_get(exp); + + RETURN(exp); +} + +static void * +nid_export_put(struct hlist_node *hnode) +{ + struct obd_export *exp; + + exp = hlist_entry(hnode, struct obd_export, exp_nid_hash); + class_export_put(exp); + + RETURN(exp); +} + +static lustre_hash_ops_t nid_hash_ops = { + .lh_hash = nid_hash, + .lh_key = nid_key, + .lh_compare = nid_compare, + .lh_get = nid_export_get, + .lh_put = nid_export_put, +}; + + +/* + * nid<->nidstats hash operations + */ + +static void * +nidstats_key(struct hlist_node *hnode) +{ + struct nid_stat *ns; + + ns = hlist_entry(hnode, struct nid_stat, nid_hash); + + RETURN(&ns->nid); +} + +static int +nidstats_compare(void *key, struct hlist_node *hnode) +{ + RETURN(*(lnet_nid_t *)nidstats_key(hnode) == *(lnet_nid_t *)key); +} + +static void * +nidstats_get(struct hlist_node *hnode) +{ + struct nid_stat *ns; + + ns = hlist_entry(hnode, struct nid_stat, nid_hash); + ns->nid_exp_ref_count++; + + RETURN(ns); +} + +static void * +nidstats_put(struct hlist_node *hnode) +{ + struct nid_stat *ns; + + ns = hlist_entry(hnode, struct nid_stat, nid_hash); + ns->nid_exp_ref_count--; + + RETURN(ns); +} + +static lustre_hash_ops_t nid_stat_hash_ops = { + .lh_hash = nid_hash, + .lh_key = nidstats_key, + .lh_compare = nidstats_compare, + .lh_get = nidstats_get, + .lh_put = nidstats_put, +}; diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c index d231e5f775ba477a506210817ba710b4a0e804ca..bbd234e31bf2f3fc09d364293a1b8f7a73a27938 100644 --- a/lustre/obdecho/echo.c +++ b/lustre/obdecho/echo.c @@ -96,11 +96,17 @@ static int echo_disconnect(struct obd_export *exp) return class_disconnect(exp); } +static int echo_init_export(struct obd_export *exp) +{ + return ldlm_init_export(exp); +} + static int echo_destroy_export(struct obd_export *exp) { ENTRY; target_destroy_export(exp); + ldlm_destroy_export(exp); RETURN(0); } @@ -539,6 +545,7 @@ static struct obd_ops echo_obd_ops = { .o_owner = THIS_MODULE, .o_connect = echo_connect, .o_disconnect = echo_disconnect, + .o_init_export = echo_init_export, .o_destroy_export = echo_destroy_export, .o_create = echo_create, .o_destroy = echo_destroy, diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 80ea370a6e570dc6e14f3f1e6d11f589684b1e9a..2573ceea404ab04bda7b0a0c817dc96a9e136a85 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -615,7 +615,7 @@ static int filter_init_export(struct obd_export *exp) exp->exp_connecting = 1; spin_unlock(&exp->exp_lock); - return 0; + return ldlm_init_export(exp); } static int filter_free_server_data(struct filter_obd *filter) @@ -2398,6 +2398,7 @@ static int filter_destroy_export(struct obd_export *exp) lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd); target_destroy_export(exp); + ldlm_destroy_export(exp); if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid)) RETURN(0); diff --git a/lustre/obdfilter/lproc_obdfilter.c b/lustre/obdfilter/lproc_obdfilter.c index 700323f9a0cb1212e2afc2f7f05adbbe122d9ce8..de5ded8780c75cfb279a04f79a9574738797e2eb 100644 --- a/lustre/obdfilter/lproc_obdfilter.c +++ b/lustre/obdfilter/lproc_obdfilter.c @@ -203,6 +203,7 @@ static struct lprocfs_vars lprocfs_filter_obd_vars[] = { { "tot_pending", lprocfs_filter_rd_tot_pending, 0, 0 }, { "tot_granted", lprocfs_filter_rd_tot_granted, 0, 0 }, { "recovery_status", lprocfs_obd_rd_recovery_status, 0, 0 }, + { "hash_stats", lprocfs_obd_rd_hash, 0, 0 }, #ifdef CRAY_XT3 { "recovery_maxtime", lprocfs_obd_rd_recovery_maxtime, lprocfs_obd_wr_recovery_maxtime, 0}, diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 6145e1e809447778bbbab3fb7e4fbf30d1e699a5..8ea20368de9773862c9bcfec4c3b701f41685551 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -70,7 +70,7 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid) return NULL; } - c = ptlrpc_get_connection(peer, self, uuid); + c = ptlrpc_connection_get(peer, self, uuid); if (c) { memcpy(c->c_remote_uuid.uuid, uuid->uuid, sizeof(c->c_remote_uuid.uuid)); @@ -81,24 +81,6 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid) return c; } -void ptlrpc_readdress_connection(struct ptlrpc_connection *conn, - struct obd_uuid *uuid) -{ - lnet_nid_t self; - lnet_process_id_t peer; - int err; - - err = ptlrpc_uuid_to_peer(uuid, &peer, &self); - if (err != 0) { - CERROR("cannot find peer %s!\n", uuid->uuid); - return; - } - - conn->c_peer = peer; - conn->c_self = self; - return; -} - static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal) { struct ptlrpc_bulk_desc *desc; diff --git a/lustre/ptlrpc/connection.c b/lustre/ptlrpc/connection.c index 34b543ac861c02f51d6ff17d1b3f47530b80456d..9442bf77cf8d3802fe5cc9cf9aaec50acb75d462 100644 --- a/lustre/ptlrpc/connection.c +++ b/lustre/ptlrpc/connection.c @@ -46,208 +46,192 @@ #include "ptlrpc_internal.h" #include <class_hash.h> -static spinlock_t conn_lock; -static struct list_head conn_list; -static struct lustre_class_hash_body *conn_hash_body; -static struct lustre_class_hash_body *conn_unused_hash_body; +static lustre_hash_t *conn_hash = NULL; +static lustre_hash_ops_t conn_hash_ops; -extern struct lustre_hash_operations conn_hash_operations; - -void ptlrpc_dump_connection(void *obj, void *data) +struct ptlrpc_connection * +ptlrpc_connection_get(lnet_process_id_t peer, lnet_nid_t self, + struct obd_uuid *uuid) { - struct ptlrpc_connection *c = obj; + struct ptlrpc_connection *conn, *conn2; + ENTRY; - CERROR("Connection %p/%s has refcount %d (nid=%s->%s)\n", - c, c->c_remote_uuid.uuid, atomic_read(&c->c_refcount), - libcfs_nid2str(c->c_self), - libcfs_nid2str(c->c_peer.nid)); + conn = lustre_hash_lookup(conn_hash, &peer); + if (conn) + GOTO(out, conn); + + OBD_ALLOC_PTR(conn); + if (!conn) + RETURN(NULL); + + conn->c_peer = peer; + conn->c_self = self; + INIT_HLIST_NODE(&conn->c_hash); + atomic_set(&conn->c_refcount, 1); + if (uuid) + obd_str2uuid(&conn->c_remote_uuid, uuid->uuid); + + /* + * Add the newly created conn to the hash, on key collision we + * lost a racing addition and must destroy our newly allocated + * connection. The object which exists in the has will be + * returned and may be compared against out object. + */ + conn2 = lustre_hash_findadd_unique(conn_hash, &peer, &conn->c_hash); + if (conn != conn2) { + OBD_FREE_PTR(conn); + conn = conn2; + } + EXIT; +out: + CDEBUG(D_INFO, "conn=%p refcount %d to %s\n", + conn, atomic_read(&conn->c_refcount), + libcfs_nid2str(conn->c_peer.nid)); + return conn; } - -void ptlrpc_dump_connections(void) + +int ptlrpc_connection_put(struct ptlrpc_connection *conn) { + int rc = 0; ENTRY; + + if (!conn) + RETURN(rc); + + LASSERT(!hlist_unhashed(&conn->c_hash)); + + /* + * We do not remove connection from hashtable and + * do not free it even if last caller released ref, + * as we want to have it cached for the case it is + * needed again. + * + * Deallocating it and later creating new connection + * again would be wastful. This way we also avoid + * expensive locking to protect things from get/put + * race when found cached connection is freed by + * ptlrpc_connection_put(). + * + * It will be freed later in module unload time, + * when ptlrpc_connection_fini()->lh_exit->conn_exit() + * path is called. + */ + if (atomic_dec_return(&conn->c_refcount) == 1) + rc = 1; - lustre_hash_iterate_all(conn_hash_body, ptlrpc_dump_connection, NULL); + CDEBUG(D_INFO, "PUT conn=%p refcount %d to %s\n", + conn, atomic_read(&conn->c_refcount), + libcfs_nid2str(conn->c_peer.nid)); - EXIT; + RETURN(rc); } - -struct ptlrpc_connection* -ptlrpc_lookup_conn_locked (lnet_process_id_t peer) + +struct ptlrpc_connection * +ptlrpc_connection_addref(struct ptlrpc_connection *conn) { - struct ptlrpc_connection *c = NULL; - int rc; - - c = lustre_hash_get_object_by_key(conn_hash_body, &peer); - if (c != NULL) - return c; - - c = lustre_hash_get_object_by_key(conn_unused_hash_body, &peer); - if (c != NULL) { - lustre_hash_delitem(conn_unused_hash_body, &peer, &c->c_hash); - rc = lustre_hash_additem_unique(conn_hash_body, &peer, - &c->c_hash); - if (rc) { - /* can't add - try with new item */ - OBD_FREE_PTR(c); - list_del(&c->c_link); - c = NULL; - } - } - - return c; -} + ENTRY; + atomic_inc(&conn->c_refcount); + CDEBUG(D_INFO, "conn=%p refcount %d to %s\n", + conn, atomic_read(&conn->c_refcount), + libcfs_nid2str(conn->c_peer.nid)); -struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer, - lnet_nid_t self, struct obd_uuid *uuid) + RETURN(conn); +} + +int ptlrpc_connection_init(void) { - struct ptlrpc_connection *c; - struct ptlrpc_connection *c2; - int rc = 0; ENTRY; - CDEBUG(D_INFO, "self %s peer %s\n", - libcfs_nid2str(self), libcfs_id2str(peer)); - - spin_lock(&conn_lock); - c = ptlrpc_lookup_conn_locked(peer); - spin_unlock(&conn_lock); - - if (c != NULL) - RETURN (c); - - OBD_ALLOC_PTR(c); - if (c == NULL) - RETURN (NULL); - - atomic_set(&c->c_refcount, 1); - c->c_peer = peer; - c->c_self = self; - INIT_HLIST_NODE(&c->c_hash); - CFS_INIT_LIST_HEAD(&c->c_link); - if (uuid != NULL) - obd_str2uuid(&c->c_remote_uuid, uuid->uuid); - - spin_lock(&conn_lock); - - c2 = ptlrpc_lookup_conn_locked(peer); - if (c2 == NULL) { - rc = lustre_hash_additem_unique(conn_hash_body, &peer, - &c->c_hash); - if (rc != 0) { - CERROR("Cannot add connection to conn_hash_body\n"); - goto out_conn; - } - list_add(&c->c_link, &conn_list); - } - -out_conn: - spin_unlock(&conn_lock); - - if (c2 == NULL && rc == 0) - RETURN (c); - - if (c != NULL) - OBD_FREE(c, sizeof(*c)); - RETURN (c2); + conn_hash = lustre_hash_init("CONN_HASH", 32, 32768, + &conn_hash_ops, LH_REHASH); + if (!conn_hash) + RETURN(-ENOMEM); + + RETURN(0); } - -int ptlrpc_put_connection(struct ptlrpc_connection *c) -{ - int rc = 0; - lnet_process_id_t peer; + +void ptlrpc_connection_fini(void) { ENTRY; + lustre_hash_exit(conn_hash); + EXIT; +} - if (c == NULL) { - CERROR("NULL connection\n"); - RETURN(0); - } +/* + * Hash operations for net_peer<->connection + */ +static unsigned +conn_hashfn(lustre_hash_t *lh, void *key, unsigned mask) +{ + return lh_djb2_hash(key, sizeof(lnet_process_id_t), mask); +} - peer = c->c_peer; +static int +conn_compare(void *key, struct hlist_node *hnode) +{ + struct ptlrpc_connection *conn; + lnet_process_id_t *conn_key; - CDEBUG (D_INFO, "connection=%p refcount %d to %s\n", - c, atomic_read(&c->c_refcount) - 1, - libcfs_nid2str(c->c_peer.nid)); + LASSERT(key != NULL); + conn_key = (lnet_process_id_t*)key; + conn = hlist_entry(hnode, struct ptlrpc_connection, c_hash); - spin_lock(&conn_lock); - LASSERT(!hlist_unhashed(&c->c_hash)); - spin_unlock(&conn_lock); + return conn_key->nid == conn->c_peer.nid && + conn_key->pid == conn->c_peer.pid; +} - if (atomic_dec_return(&c->c_refcount) == 1) { - spin_lock(&conn_lock); +static void * +conn_key(struct hlist_node *hnode) +{ + struct ptlrpc_connection *conn; + conn = hlist_entry(hnode, struct ptlrpc_connection, c_hash); + return &conn->c_peer; +} - lustre_hash_delitem(conn_hash_body, &peer, &c->c_hash); - rc = lustre_hash_additem_unique(conn_unused_hash_body, &peer, - &c->c_hash); - spin_unlock(&conn_lock); - if (rc != 0) { - CERROR("Cannot hash connection to conn_hash_body\n"); - GOTO(ret, rc); - } - rc = 1; - } +static void * +conn_get(struct hlist_node *hnode) +{ + struct ptlrpc_connection *conn; - if (atomic_read(&c->c_refcount) < 0) - CERROR("connection %p refcount %d!\n", - c, atomic_read(&c->c_refcount)); -ret : + conn = hlist_entry(hnode, struct ptlrpc_connection, c_hash); + atomic_inc(&conn->c_refcount); - RETURN(rc); + return conn; } -struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *c) +static void * +conn_put(struct hlist_node *hnode) { - ENTRY; - atomic_inc(&c->c_refcount); - CDEBUG (D_INFO, "connection=%p refcount %d to %s\n", - c, atomic_read(&c->c_refcount), - libcfs_nid2str(c->c_peer.nid)); - RETURN(c); -} + struct ptlrpc_connection *conn; -int ptlrpc_init_connection(void) -{ - int rc = 0; - CFS_INIT_LIST_HEAD(&conn_list); - rc = lustre_hash_init(&conn_hash_body, "CONN_HASH", - 128, &conn_hash_operations); - if (rc) - GOTO(ret, rc); - - rc = lustre_hash_init(&conn_unused_hash_body, "CONN_UNUSED_HASH", - 128, &conn_hash_operations); - if (rc) - GOTO(ret, rc); - - spin_lock_init(&conn_lock); -ret: - if (rc) { - lustre_hash_exit(&conn_hash_body); - lustre_hash_exit(&conn_unused_hash_body); - } - RETURN(rc); + conn = hlist_entry(hnode, struct ptlrpc_connection, c_hash); + atomic_dec(&conn->c_refcount); + + return conn; } -void ptlrpc_cleanup_connection(void) +static void +conn_exit(struct hlist_node *hnode) { - struct list_head *tmp, *pos; - struct ptlrpc_connection *c; - - spin_lock(&conn_lock); - - lustre_hash_exit(&conn_unused_hash_body); - lustre_hash_exit(&conn_hash_body); - - list_for_each_safe(tmp, pos, &conn_list) { - c = list_entry(tmp, struct ptlrpc_connection, c_link); - if (atomic_read(&c->c_refcount) != 0) - CERROR("Connection %p/%s has refcount %d (nid=%s)\n", - c, c->c_remote_uuid.uuid, - atomic_read(&c->c_refcount), - libcfs_nid2str(c->c_peer.nid)); - list_del(&c->c_link); - OBD_FREE(c, sizeof(*c)); - } - spin_unlock(&conn_lock); + struct ptlrpc_connection *conn; + + conn = hlist_entry(hnode, struct ptlrpc_connection, c_hash); + /* + * Nothing should be left. Connection user put it and + * connection also was deleted from table by this time + * so we should have 0 refs. + */ + LASSERTF(atomic_read(&conn->c_refcount) == 0, + "Busy connection with %d refs\n", + atomic_read(&conn->c_refcount)); + OBD_FREE_PTR(conn); } + +static lustre_hash_ops_t conn_hash_ops = { + .lh_hash = conn_hashfn, + .lh_compare = conn_compare, + .lh_key = conn_key, + .lh_get = conn_get, + .lh_put = conn_put, + .lh_exit = conn_exit, +}; diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index bfb7b3cbb6132cbaffd50c00eb5c7f6c3f55af1e..328a2c9e38c0571786100b4346e80d31435b1272 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -414,13 +414,13 @@ static int import_select_connection(struct obd_import *imp) /* switch connection, don't mind if it's same as the current one */ if (imp->imp_connection) - ptlrpc_put_connection(imp->imp_connection); + ptlrpc_connection_put(imp->imp_connection); imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn); dlmexp = class_conn2export(&imp->imp_dlm_handle); LASSERT(dlmexp != NULL); if (dlmexp->exp_connection) - ptlrpc_put_connection(dlmexp->exp_connection); + ptlrpc_connection_put(dlmexp->exp_connection); dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn); class_export_put(dlmexp); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 66aff1d6e47111668c067afb988f06ecb5036bdd..2551d9079ca2445c1a107f3476bd33a998a23464 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -406,7 +406,7 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int flags) } if (req->rq_export == NULL || req->rq_export->exp_connection == NULL) - conn = ptlrpc_get_connection(req->rq_peer, req->rq_self, NULL); + conn = ptlrpc_connection_get(req->rq_peer, req->rq_self, NULL); else conn = ptlrpc_connection_addref(req->rq_export->exp_connection); @@ -427,7 +427,7 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int flags) atomic_dec (&svc->srv_outstanding_replies); ptlrpc_req_drop_rs(req); } - ptlrpc_put_connection(conn); + ptlrpc_connection_put(conn); return rc; } diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index e0a1bf22c337437c0f42261e59045cfcc1ac23a7..9e8f5d29d7a18d7d24a6d8cab122aad140ce2ce9 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -74,12 +74,12 @@ __init int ptlrpc_init(void) RETURN(rc); cleanup_phase = 1; - rc = ptlrpc_init_connection(); + rc = ptlrpc_connection_init(); if (rc) GOTO(cleanup, rc); cleanup_phase = 2; - ptlrpc_put_connection_superhack = ptlrpc_put_connection; + ptlrpc_put_connection_superhack = ptlrpc_connection_put; rc = ptlrpc_start_pinger(); if (rc) @@ -113,7 +113,7 @@ cleanup: case 3: ptlrpc_stop_pinger(); case 2: - ptlrpc_cleanup_connection(); + ptlrpc_connection_fini(); case 1: ptlrpc_exit_portals(); default: ; @@ -129,18 +129,16 @@ static void __exit ptlrpc_exit(void) ldlm_exit(); ptlrpc_stop_pinger(); ptlrpc_exit_portals(); - ptlrpc_cleanup_connection(); + ptlrpc_connection_fini(); cfs_mem_cache_destroy(ptlrpc_cbdata_slab); } /* connection.c */ -EXPORT_SYMBOL(ptlrpc_dump_connections); -EXPORT_SYMBOL(ptlrpc_readdress_connection); -EXPORT_SYMBOL(ptlrpc_get_connection); -EXPORT_SYMBOL(ptlrpc_put_connection); +EXPORT_SYMBOL(ptlrpc_connection_get); +EXPORT_SYMBOL(ptlrpc_connection_put); EXPORT_SYMBOL(ptlrpc_connection_addref); -EXPORT_SYMBOL(ptlrpc_init_connection); -EXPORT_SYMBOL(ptlrpc_cleanup_connection); +EXPORT_SYMBOL(ptlrpc_connection_init); +EXPORT_SYMBOL(ptlrpc_connection_fini); /* niobuf.c */ EXPORT_SYMBOL(ptlrpc_start_bulk_transfer); diff --git a/lustre/quota/quota_adjust_qunit.c b/lustre/quota/quota_adjust_qunit.c index d2a33ddc4427625a094607021f3600a3e40e7279..206cb4508bc4ee9e65068e1be30a7d26957be1ae 100644 --- a/lustre/quota/quota_adjust_qunit.c +++ b/lustre/quota/quota_adjust_qunit.c @@ -143,8 +143,7 @@ int quota_search_lqs(struct qunit_data *qdata, struct quota_adjust_qunit *oqaq, oqaq_tmp = oqaq; } - *lqs_return = lustre_hash_get_object_by_key(LQC_HASH_BODY(qctxt), - oqaq_tmp); + *lqs_return = lustre_hash_lookup(qctxt->lqc_lqs_hash, oqaq_tmp); if (*lqs_return) LQS_DEBUG((*lqs_return), "show lqs\n"); @@ -157,45 +156,42 @@ int quota_create_lqs(struct qunit_data *qdata, struct quota_adjust_qunit *oqaq, struct lustre_quota_ctxt *qctxt, struct lustre_qunit_size **lqs_return) { - int rc = 0; - struct quota_adjust_qunit *oqaq_tmp = NULL; struct lustre_qunit_size *lqs = NULL; + int rc = 0; ENTRY; LASSERT(*lqs_return == NULL); LASSERT(oqaq || qdata); - if (!oqaq) { - OBD_ALLOC_PTR(oqaq_tmp); - if (!oqaq_tmp) - RETURN(-ENOMEM); - qdata_to_oqaq(qdata, oqaq_tmp); - } else { - oqaq_tmp = oqaq; - } - OBD_ALLOC_PTR(lqs); if (!lqs) GOTO(out, rc = -ENOMEM); + if (!oqaq) { + qdata_to_oqaq(qdata, &lqs->lqs_key); + } else { + lqs->lqs_key = *oqaq; + } + spin_lock_init(&lqs->lqs_lock); lqs->lqs_bwrite_pending = 0; lqs->lqs_iwrite_pending = 0; lqs->lqs_ino_rec = 0; lqs->lqs_blk_rec = 0; - lqs->lqs_id = oqaq_tmp->qaq_id; - lqs->lqs_flags = QAQ_IS_GRP(oqaq_tmp); + lqs->lqs_id = lqs->lqs_key.qaq_id; + lqs->lqs_flags = QAQ_IS_GRP(&lqs->lqs_key); lqs->lqs_bunit_sz = qctxt->lqc_bunit_sz; lqs->lqs_iunit_sz = qctxt->lqc_iunit_sz; lqs->lqs_btune_sz = qctxt->lqc_btune_sz; lqs->lqs_itune_sz = qctxt->lqc_itune_sz; + lqs->lqs_ctxt = qctxt; if (qctxt->lqc_handler) { lqs->lqs_last_bshrink = 0; lqs->lqs_last_ishrink = 0; } lqs_initref(lqs); - rc = lustre_hash_additem_unique(LQC_HASH_BODY(qctxt), - oqaq_tmp, &lqs->lqs_hash); + rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash, + &lqs->lqs_key, &lqs->lqs_hash); LQS_DEBUG(lqs, "create lqs\n"); if (!rc) { lqs_getref(lqs); @@ -204,8 +200,6 @@ int quota_create_lqs(struct qunit_data *qdata, struct quota_adjust_qunit *oqaq, out: if (rc && lqs) OBD_FREE_PTR(lqs); - if (!oqaq) - OBD_FREE_PTR(oqaq_tmp); RETURN(rc); } @@ -233,7 +227,7 @@ search_lqs: LQS_DEBUG(lqs, "release lqs\n"); /* this is for quota_search_lqs */ lqs_putref(lqs); - /* this is for deleting this lqs */ + /* kill lqs */ lqs_putref(lqs); } RETURN(rc); diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index d2402cc7c472275d3941c952f18fcdfb09a67f93..0d8f62b74b236eab5bd4de544e74c5dd432ba27c 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -61,7 +61,7 @@ #include <lprocfs_status.h> #include "quota_internal.h" -extern struct lustre_hash_operations lqs_hash_operations; +static lustre_hash_ops_t lqs_hash_ops; unsigned long default_bunit_sz = 128 * 1024 * 1024; /* 128M bytes */ unsigned long default_btune_ratio = 50; /* 50 percentage */ @@ -1112,6 +1112,7 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) if (rc) RETURN(rc); + cfs_waitq_init(&qctxt->lqc_wait_for_qmaster); spin_lock_init(&qctxt->lqc_lock); spin_lock(&qctxt->lqc_lock); qctxt->lqc_handler = handler; @@ -1132,16 +1133,13 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) qctxt->lqc_itune_sz = default_iunit_sz * default_itune_ratio / 100; qctxt->lqc_switch_seconds = 300; /* enlarging will wait 5 minutes * after the last shrinking */ - rc = lustre_hash_init(&LQC_HASH_BODY(qctxt), "LQS_HASH",128, - &lqs_hash_operations); - if (rc) { - CDEBUG(D_ERROR, "initialize hash lqs for %s error!\n", - obd->obd_name); - lustre_hash_exit(&LQC_HASH_BODY(qctxt)); - } - cfs_waitq_init(&qctxt->lqc_wait_for_qmaster); spin_unlock(&qctxt->lqc_lock); + qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH", 128, 128, + &lqs_hash_ops, 0); + if (!qctxt->lqc_lqs_hash) + CERROR("initialize hash lqs for %s error!\n", obd->obd_name); + #ifdef LPROCFS if (lquota_proc_setup(obd, is_master(obd, qctxt, 0, 0))) CERROR("initialize proc for %s error!\n", obd->obd_name); @@ -1184,7 +1182,8 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) qunit_put(qunit); } - lustre_hash_exit(&LQC_HASH_BODY(qctxt)); + lustre_hash_exit(qctxt->lqc_lqs_hash); + /* after qctxt_cleanup, qctxt might be freed, then check_qm() is * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */ while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) { @@ -1313,3 +1312,98 @@ qslave_start_recovery(struct obd_device *obd, struct lustre_quota_ctxt *qctxt) exit: EXIT; } + +/* + * lqs<->qctxt hash operations + */ + +/* string hashing using djb2 hash algorithm */ +static unsigned +lqs_hash(lustre_hash_t *lh, void *key, unsigned mask) +{ + struct quota_adjust_qunit *lqs_key; + unsigned hash; + ENTRY; + + LASSERT(key); + lqs_key = (struct quota_adjust_qunit *)key; + hash = (QAQ_IS_GRP(lqs_key) ? 5381 : 5387) * lqs_key->qaq_id; + + RETURN(hash & mask); +} + +static int +lqs_compare(void *key, struct hlist_node *hnode) +{ + struct quota_adjust_qunit *lqs_key; + struct lustre_qunit_size *q; + int rc; + ENTRY; + + LASSERT(key); + lqs_key = (struct quota_adjust_qunit *)key; + q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + + spin_lock(&q->lqs_lock); + rc = ((lqs_key->qaq_id == q->lqs_id) && + (QAQ_IS_GRP(lqs_key) == LQS_IS_GRP(q))); + spin_unlock(&q->lqs_lock); + + RETURN(rc); +} + +static void * +lqs_get(struct hlist_node *hnode) +{ + struct lustre_qunit_size *q = + hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + ENTRY; + + atomic_inc(&q->lqs_refcount); + CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", + q, atomic_read(&q->lqs_refcount)); + + RETURN(q); +} + +static void * +lqs_put(struct hlist_node *hnode) +{ + struct lustre_qunit_size *q = + hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + ENTRY; + + LASSERT(atomic_read(&q->lqs_refcount) > 0); + atomic_dec(&q->lqs_refcount); + CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", + q, atomic_read(&q->lqs_refcount)); + + RETURN(q); +} + +static void +lqs_exit(struct hlist_node *hnode) +{ + struct lustre_qunit_size *q; + ENTRY; + + q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + /* + * Nothing should be left. User of lqs put it and + * lqs also was deleted from table by this time + * so we should have 0 refs. + */ + LASSERTF(atomic_read(&q->lqs_refcount) == 0, + "Busy lqs %p with %d refs\n", q, + atomic_read(&q->lqs_refcount)); + OBD_FREE_PTR(q); + EXIT; +} + +static lustre_hash_ops_t lqs_hash_ops = { + .lh_hash = lqs_hash, + .lh_compare = lqs_compare, + .lh_get = lqs_get, + .lh_put = lqs_put, + .lh_exit = lqs_exit +};