From 8073f0e4bef2db551a4b4bcaeb72a9986571f1bd Mon Sep 17 00:00:00 2001 From: yury <yury> Date: Sat, 13 Sep 2008 17:47:04 +0000 Subject: [PATCH] b=16777 r=adilger,robert - fixes server side scalability issue which became visible with lru resize work landed. It replcaes linear list of held locks on server with hash table from class_hash.c which improves lock find time drastically for case when one client holds ~limit of server locks (quite a big number for big servers) which is very possible if all clients are idle and one compiles kernel, etc. --- lustre/include/lustre_dlm.h | 162 +++++++++++++++++++-------- lustre/include/lustre_export.h | 9 +- lustre/ldlm/ldlm_flock.c | 9 +- lustre/ldlm/ldlm_lib.c | 1 + lustre/ldlm/ldlm_lock.c | 50 ++++----- lustre/ldlm/ldlm_lockd.c | 182 +++++++++++++++++++++---------- lustre/ldlm/ldlm_request.c | 25 ++++- lustre/mdt/mdt_handler.c | 48 ++++---- lustre/mgs/mgs_handler.c | 7 ++ lustre/obdclass/genops.c | 5 +- lustre/obdclass/lprocfs_status.c | 56 +++++++++- lustre/obdecho/echo.c | 7 ++ lustre/obdfilter/filter.c | 3 +- 13 files changed, 384 insertions(+), 180 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 7c65539e03..c254e32a84 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -543,81 +543,153 @@ struct ldlm_interval_tree { }; struct ldlm_lock { - struct portals_handle l_handle; // must be first in the structure - atomic_t l_refc; - - /* internal spinlock protects l_resource. we should hold this lock - * first before grabbing res_lock.*/ - spinlock_t l_lock; - - /* ldlm_lock_change_resource() can change this */ - struct ldlm_resource *l_resource; - - /* protected by ns_hash_lock. FIXME */ - struct list_head l_lru; - - /* protected by lr_lock, linkage to resource's lock queues */ - struct list_head l_res_link; - - struct ldlm_interval *l_tree_node; /* tree node for ldlm_extent */ - - /* protected by led_lock */ - struct list_head l_export_chain; // per-export chain of locks - - /* protected by lr_lock */ - ldlm_mode_t l_req_mode; - ldlm_mode_t l_granted_mode; - + /** + * Must be first in the structure. + */ + struct portals_handle l_handle; + /** + * Lock reference count. + */ + atomic_t l_refc; + /** + * Internal spinlock protects l_resource. we should hold this lock + * first before grabbing res_lock. + */ + spinlock_t l_lock; + /** + * ldlm_lock_change_resource() can change this. + */ + struct ldlm_resource *l_resource; + /** + * Protected by ns_hash_lock. List item for client side lru list. + */ + struct list_head l_lru; + /** + * Protected by lr_lock, linkage to resource's lock queues. + */ + struct list_head l_res_link; + /** + * Tree node for ldlm_extent. + */ + struct ldlm_interval *l_tree_node; + /** + * Protected by per-bucket exp->exp_lock_hash locks. Per export hash + * of locks. + */ + struct hlist_node l_exp_hash; + /** + * Protected by lr_lock. Requested mode. + */ + ldlm_mode_t l_req_mode; + /** + * Granted mode, also protected by lr_lock. + */ + ldlm_mode_t l_granted_mode; + /** + * Lock enqueue completion handler. + */ ldlm_completion_callback l_completion_ast; + /** + * Lock blocking ast handler. + */ ldlm_blocking_callback l_blocking_ast; + /** + * Lock glimpse handler. + */ ldlm_glimpse_callback l_glimpse_ast; - struct obd_export *l_export; - struct obd_export *l_conn_export; + /** + * Lock export. + */ + struct obd_export *l_export; + /** + * Lock connection export. + */ + struct obd_export *l_conn_export; - struct lustre_handle l_remote_handle; - ldlm_policy_data_t l_policy_data; + /** + * Remote lock handle. + */ + struct lustre_handle l_remote_handle; - /* protected by lr_lock */ + ldlm_policy_data_t l_policy_data; + + /* + * Protected by lr_lock. Various counters: readers, writers, etc. + */ __u32 l_flags; __u32 l_readers; __u32 l_writers; __u8 l_destroyed; - /* If the lock is granted, a process sleeps on this waitq to learn when + /** + * If the lock is granted, a process sleeps on this waitq to learn when * it's no longer in use. If the lock is not granted, a process sleeps - * on this waitq to learn when it becomes granted. */ + * on this waitq to learn when it becomes granted. + */ cfs_waitq_t l_waitq; + struct timeval l_enqueued_time; - cfs_time_t l_last_used; /* jiffies */ + /** + * Jiffies. Should be converted to time if needed. + */ + cfs_time_t l_last_used; + struct ldlm_extent l_req_extent; - /* Client-side-only members */ - __u32 l_lvb_len; /* temporary storage for */ - void *l_lvb_data; /* an LVB received during */ - void *l_lvb_swabber; /* an enqueue */ + /* + * Client-side-only members. + */ + + /** + * Temporary storage for an LVB received during an enqueue operation. + */ + __u32 l_lvb_len; + void *l_lvb_data; + void *l_lvb_swabber; + void *l_ast_data; spinlock_t l_extents_list_lock; struct list_head l_extents_list; struct list_head l_cache_locks_list; - /* Server-side-only members */ + /* + * Server-side-only members. + */ - /* protected by elt_lock */ - struct list_head l_pending_chain; /* callbacks pending */ - cfs_time_t l_callback_timeout; /* jiffies */ + /** + * Protected by elt_lock. Callbacks pending. + */ + struct list_head l_pending_chain; - __u32 l_pid; /* pid which created this lock */ + cfs_time_t l_callback_timeout; - /* for ldlm_add_ast_work_item() */ + /** + * Pid which created this lock. + */ + __u32 l_pid; + + /** + * For ldlm_add_ast_work_item(). + */ struct list_head l_bl_ast; + /** + * For ldlm_add_ast_work_item(). + */ struct list_head l_cp_ast; + /** + * For ldlm_add_ast_work_item(). + */ + struct list_head l_rk_ast; + struct ldlm_lock *l_blocking_lock; int l_bl_ast_run; - /* protected by lr_lock, linkages to "skip lists" */ + /** + * Protected by lr_lock, linkages to "skip lists". + */ struct list_head l_sl_mode; struct list_head l_sl_policy; }; @@ -767,6 +839,8 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock); void ldlm_revoke_export_locks(struct obd_export *exp); int ldlm_get_ref(void); void ldlm_put_ref(void); +int ldlm_init_export(struct obd_export *exp); +void ldlm_destroy_export(struct obd_export *exp); /* ldlm_lock.c */ ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res); diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index 387bf9e20c..55961d16ae 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -40,6 +40,7 @@ #include <lustre/lustre_idl.h> #include <lustre_dlm.h> #include <lprocfs_status.h> +#include <class_hash.h> /* Data stored per client in the last_rcvd file. In le32 order. */ struct mds_client_data; @@ -76,11 +77,6 @@ struct osc_creator { cfs_waitq_t oscc_waitq; /* creating procs wait on this */ }; -struct ldlm_export_data { - struct list_head led_held_locks; /* protected by led_lock */ - spinlock_t led_lock; -}; - struct ec_export_data { /* echo client */ struct list_head eced_locks; }; @@ -128,7 +124,8 @@ struct obd_export { struct lprocfs_stats *exp_ldlm_stats; struct ptlrpc_connection *exp_connection; __u32 exp_conn_cnt; - struct ldlm_export_data exp_ldlm_data; + lustre_hash_t *exp_lock_hash; /* existing lock hash */ + spinlock_t exp_lock_hash_lock; struct list_head exp_outstanding_replies; time_t exp_last_request_time; struct list_head exp_req_replay_queue; diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 86e8cdcef2..fc3b59313c 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -398,10 +398,11 @@ reprocess: new2->l_conn_export = lock->l_conn_export; if (lock->l_export != NULL) { new2->l_export = class_export_get(lock->l_export); - spin_lock(&new2->l_export->exp_ldlm_data.led_lock); - list_add(&new2->l_export_chain, - &new2->l_export->exp_ldlm_data.led_held_locks); - spin_unlock(&new2->l_export->exp_ldlm_data.led_lock); + if (new2->l_export->exp_lock_hash && + hlist_unhashed(&new2->l_exp_hash)) + lustre_hash_add(new2->l_export->exp_lock_hash, + &new2->l_remote_handle, + &new2->l_exp_hash); } if (*flags == LDLM_FL_WAIT_NOREPROC) { ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode); diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index c6de21ff9c..3978b7156d 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -909,6 +909,7 @@ dont_check_exports: &remote_uuid); spin_lock(&target->obd_dev_lock); + /* Export might be hashed already, e.g. if this is reconnect */ if (hlist_unhashed(&export->exp_nid_hash)) lustre_hash_add(export->exp_obd->obd_nid_hash, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 5678333ebd..27965f4f4f 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -265,11 +265,9 @@ int ldlm_lock_destroy_internal(struct ldlm_lock *lock) } lock->l_destroyed = 1; - if (lock->l_export) - spin_lock(&lock->l_export->exp_ldlm_data.led_lock); - list_del_init(&lock->l_export_chain); - if (lock->l_export) - spin_unlock(&lock->l_export->exp_ldlm_data.led_lock); + if (lock->l_export && lock->l_export->exp_lock_hash) + lustre_hash_del(lock->l_export->exp_lock_hash, + &lock->l_remote_handle, &lock->l_exp_hash); ldlm_lock_remove_from_lru(lock); class_handle_unhash(&lock->l_handle); @@ -343,14 +341,15 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) atomic_set(&lock->l_refc, 2); CFS_INIT_LIST_HEAD(&lock->l_res_link); CFS_INIT_LIST_HEAD(&lock->l_lru); - CFS_INIT_LIST_HEAD(&lock->l_export_chain); CFS_INIT_LIST_HEAD(&lock->l_pending_chain); CFS_INIT_LIST_HEAD(&lock->l_bl_ast); CFS_INIT_LIST_HEAD(&lock->l_cp_ast); + CFS_INIT_LIST_HEAD(&lock->l_rk_ast); cfs_waitq_init(&lock->l_waitq); lock->l_blocking_lock = NULL; CFS_INIT_LIST_HEAD(&lock->l_sl_mode); CFS_INIT_LIST_HEAD(&lock->l_sl_policy); + CFS_INIT_HLIST_NODE(&lock->l_exp_hash); atomic_inc(&resource->lr_namespace->ns_locks); CFS_INIT_LIST_HEAD(&lock->l_handle.h_link); @@ -1444,10 +1443,10 @@ static int ldlm_work_revoke_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) { struct ldlm_lock_desc desc; - struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_export_chain); + struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_rk_ast); ENTRY; - list_del_init(&lock->l_export_chain); + list_del_init(&lock->l_rk_ast); /* the desc just pretend to exclusive */ ldlm_lock2desc(lock, &desc); @@ -1660,30 +1659,27 @@ int ldlm_lock_set_data(struct lustre_handle *lockh, void *data) RETURN(0); } -void ldlm_cancel_locks_for_export(struct obd_export *exp) +void ldlm_cancel_locks_for_export_cb(void *obj, void *data) { - struct ldlm_lock *lock; + struct obd_export *exp = data; + struct ldlm_lock *lock = obj; struct ldlm_resource *res; - spin_lock(&exp->exp_ldlm_data.led_lock); - while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) { - lock = list_entry(exp->exp_ldlm_data.led_held_locks.next, - struct ldlm_lock, l_export_chain); - res = ldlm_resource_getref(lock->l_resource); - LDLM_LOCK_GET(lock); - spin_unlock(&exp->exp_ldlm_data.led_lock); - - LDLM_DEBUG(lock, "export %p", exp); - ldlm_res_lvbo_update(res, NULL, 0, 1); + res = ldlm_resource_getref(lock->l_resource); + LDLM_LOCK_GET(lock); - ldlm_lock_cancel(lock); - ldlm_reprocess_all(res); + LDLM_DEBUG(lock, "export %p", exp); + ldlm_res_lvbo_update(res, NULL, 0, 1); + ldlm_lock_cancel(lock); + ldlm_reprocess_all(res); + ldlm_resource_putref(res); + LDLM_LOCK_PUT(lock); +} - ldlm_resource_putref(res); - LDLM_LOCK_PUT(lock); - spin_lock(&exp->exp_ldlm_data.led_lock); - } - spin_unlock(&exp->exp_ldlm_data.led_lock); +void ldlm_cancel_locks_for_export(struct obd_export *exp) +{ + lustre_hash_for_each_empty(exp->exp_lock_hash, + ldlm_cancel_locks_for_export_cb, exp); } struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 88afb3f7ca..870ae2320e 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -839,26 +839,6 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) RETURN(rc); } -static struct ldlm_lock * -find_existing_lock(struct obd_export *exp, - const struct lustre_handle *remote_hdl) -{ - struct list_head *iter; - - spin_lock(&exp->exp_ldlm_data.led_lock); - list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { - struct ldlm_lock *lock; - lock = list_entry(iter, struct ldlm_lock, l_export_chain); - if (lock->l_remote_handle.cookie == remote_hdl->cookie) { - LDLM_LOCK_GET(lock); - spin_unlock(&exp->exp_ldlm_data.led_lock); - return lock; - } - } - spin_unlock(&exp->exp_ldlm_data.led_lock); - return NULL; -} - #ifdef __KERNEL__ extern unsigned long long lu_time_stamp_get(void); #else @@ -979,8 +959,9 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, #endif if (unlikely(flags & LDLM_FL_REPLAY)) { - lock = find_existing_lock(req->rq_export, - &dlm_req->lock_handle[0]); + /* Find an existing lock in the per-export lock hash */ + lock = lustre_hash_lookup(req->rq_export->exp_lock_hash, + (void *)&dlm_req->lock_handle[0]); if (lock != NULL) { DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie " LPX64, lock->l_handle.h_cookie); @@ -1010,10 +991,11 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, GOTO(out, rc = -ENOTCONN); } lock->l_export = class_export_get(req->rq_export); - spin_lock(&lock->l_export->exp_ldlm_data.led_lock); - list_add(&lock->l_export_chain, - &lock->l_export->exp_ldlm_data.led_held_locks); - spin_unlock(&lock->l_export->exp_ldlm_data.led_lock); + + if (lock->l_export->exp_lock_hash) + lustre_hash_add(lock->l_export->exp_lock_hash, + &lock->l_remote_handle, + &lock->l_exp_hash); existing_lock: @@ -1829,47 +1811,51 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) RETURN(0); } -void ldlm_revoke_export_locks(struct obd_export *exp) +void ldlm_revoke_lock_cb(void *obj, void *data) { - struct list_head *locklist = &exp->exp_ldlm_data.led_held_locks; - struct list_head rpc_list; - struct ldlm_lock *lock, *next; + struct list_head *rpc_list = data; + struct ldlm_lock *lock = obj; - ENTRY; - CFS_INIT_LIST_HEAD(&rpc_list); + lock_res_and_lock(lock); - spin_lock(&exp->exp_ldlm_data.led_lock); - list_for_each_entry_safe(lock, next, locklist, l_export_chain) { - lock_res_and_lock(lock); + if (lock->l_req_mode != lock->l_granted_mode) { + unlock_res_and_lock(lock); + return; + } - if (lock->l_req_mode != lock->l_granted_mode) { - unlock_res_and_lock(lock); - continue; - } + LASSERT(lock->l_resource); + if (lock->l_resource->lr_type != LDLM_IBITS && + lock->l_resource->lr_type != LDLM_PLAIN) { + unlock_res_and_lock(lock); + return; + } - LASSERT(lock->l_resource); - if (lock->l_resource->lr_type != LDLM_IBITS && - lock->l_resource->lr_type != LDLM_PLAIN) { - unlock_res_and_lock(lock); - continue; - } + if (lock->l_flags & LDLM_FL_AST_SENT) { + unlock_res_and_lock(lock); + return; + } - if (lock->l_flags & LDLM_FL_AST_SENT) { - unlock_res_and_lock(lock); - continue; - } + LASSERT(lock->l_blocking_ast); + LASSERT(!lock->l_blocking_lock); - LASSERT(lock->l_blocking_ast); - LASSERT(!lock->l_blocking_lock); + lock->l_flags |= LDLM_FL_AST_SENT; + if (lock->l_export && lock->l_export->exp_lock_hash) + lustre_hash_del(lock->l_export->exp_lock_hash, + &lock->l_remote_handle, &lock->l_exp_hash); + list_add_tail(&lock->l_rk_ast, rpc_list); + LDLM_LOCK_GET(lock); - lock->l_flags |= LDLM_FL_AST_SENT; - list_move(&lock->l_export_chain, &rpc_list); - LDLM_LOCK_GET(lock); + unlock_res_and_lock(lock); +} - unlock_res_and_lock(lock); - } - spin_unlock(&exp->exp_ldlm_data.led_lock); +void ldlm_revoke_export_locks(struct obd_export *exp) +{ + struct list_head rpc_list; + ENTRY; + CFS_INIT_LIST_HEAD(&rpc_list); + lustre_hash_for_each_empty(exp->exp_lock_hash, + ldlm_revoke_lock_cb, &rpc_list); ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST); EXIT; @@ -2038,6 +2024,88 @@ void ldlm_put_ref(void) EXIT; } +/* + * Export handle<->lock hash operations. + */ +static unsigned +ldlm_export_lock_hash(lustre_hash_t *lh, void *key, unsigned mask) +{ + return lh_u64_hash(((struct lustre_handle *)key)->cookie, mask); +} + +static void * +ldlm_export_lock_key(struct hlist_node *hnode) +{ + struct ldlm_lock *lock; + ENTRY; + + lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash); + RETURN(&lock->l_remote_handle); +} + +static int +ldlm_export_lock_compare(void *key, struct hlist_node *hnode) +{ + ENTRY; + RETURN(lustre_handle_equal(ldlm_export_lock_key(hnode), key)); +} + +static void * +ldlm_export_lock_get(struct hlist_node *hnode) +{ + struct ldlm_lock *lock; + ENTRY; + + lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash); + LDLM_LOCK_GET(lock); + + RETURN(lock); +} + +static void * +ldlm_export_lock_put(struct hlist_node *hnode) +{ + struct ldlm_lock *lock; + ENTRY; + + lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash); + LDLM_LOCK_PUT(lock); + + RETURN(lock); +} + +static lustre_hash_ops_t ldlm_export_lock_ops = { + .lh_hash = ldlm_export_lock_hash, + .lh_key = ldlm_export_lock_key, + .lh_compare = ldlm_export_lock_compare, + .lh_get = ldlm_export_lock_get, + .lh_put = ldlm_export_lock_put +}; + +int ldlm_init_export(struct obd_export *exp) +{ + ENTRY; + + exp->exp_lock_hash = + lustre_hash_init(obd_uuid2str(&exp->exp_client_uuid), + 128, 65536, &ldlm_export_lock_ops, LH_REHASH); + + if (!exp->exp_lock_hash) + RETURN(-ENOMEM); + + RETURN(0); +} +EXPORT_SYMBOL(ldlm_init_export); + +void ldlm_destroy_export(struct obd_export *exp) +{ + ENTRY; + lustre_hash_exit(exp->exp_lock_hash); + exp->exp_lock_hash = NULL; + EXIT; +} +EXPORT_SYMBOL(ldlm_destroy_export); + static int ldlm_setup(void) { struct ldlm_bl_pool *blp; diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index a75ef68b7d..99f211ee65 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -377,6 +377,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, { struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; int is_replay = *flags & LDLM_FL_REPLAY; + struct lustre_handle old_hash_key; struct ldlm_lock *lock; struct ldlm_reply *reply; int cleanup_phase = 1; @@ -425,7 +426,15 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, cleanup_phase = 0; lock_res_and_lock(lock); + old_hash_key = lock->l_remote_handle; lock->l_remote_handle = reply->lock_handle; + + /* Key change rehash lock in per-export hash with new key */ + if (exp->exp_lock_hash) + lustre_hash_rehash_key(exp->exp_lock_hash, &old_hash_key, + &lock->l_remote_handle, + &lock->l_exp_hash); + *flags = reply->lock_flags; lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS; /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match() @@ -1973,8 +1982,10 @@ static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure) static int replay_lock_interpret(struct ptlrpc_request *req, struct ldlm_async_args *aa, int rc) { - struct ldlm_lock *lock; - struct ldlm_reply *reply; + struct lustre_handle old_hash_key; + struct ldlm_lock *lock; + struct ldlm_reply *reply; + struct obd_export *exp; ENTRY; atomic_dec(&req->rq_import->imp_replay_inflight); @@ -1996,7 +2007,16 @@ static int replay_lock_interpret(struct ptlrpc_request *req, GOTO(out, rc = -ESTALE); } + old_hash_key = lock->l_remote_handle; lock->l_remote_handle = reply->lock_handle; + + /* Key change rehash lock in per-export hash with new key */ + exp = req->rq_export; + if (exp && exp->exp_lock_hash) + lustre_hash_rehash_key(exp->exp_lock_hash, &old_hash_key, + &lock->l_remote_handle, + &lock->l_exp_hash); + LDLM_DEBUG(lock, "replayed lock:"); ptlrpc_import_recovery_state_machine(req->rq_import); LDLM_LOCK_PUT(lock); @@ -2004,7 +2024,6 @@ out: if (rc != ELDLM_OK) ptlrpc_connect_import(req->rq_import, NULL); - RETURN(rc); } diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index d746f0ef97..bc457d444a 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -2712,27 +2712,23 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info, RETURN(ELDLM_LOCK_REPLACED); } - /* This lock might already be given to the client by an resent req, - * in this case we should return ELDLM_LOCK_ABORTED, - * so we should check led_held_locks here, but it will affect - * performance, FIXME + /* + * Fixup the lock to be given to the client. */ - /* Fixup the lock to be given to the client */ lock_res_and_lock(new_lock); new_lock->l_readers = 0; new_lock->l_writers = 0; new_lock->l_export = class_export_get(req->rq_export); - spin_lock(&req->rq_export->exp_ldlm_data.led_lock); - list_add(&new_lock->l_export_chain, - &new_lock->l_export->exp_ldlm_data.led_held_locks); - spin_unlock(&req->rq_export->exp_ldlm_data.led_lock); - new_lock->l_blocking_ast = lock->l_blocking_ast; new_lock->l_completion_ast = lock->l_completion_ast; new_lock->l_remote_handle = lock->l_remote_handle; new_lock->l_flags &= ~LDLM_FL_LOCAL; + lustre_hash_add(new_lock->l_export->exp_lock_hash, + &new_lock->l_remote_handle, + &new_lock->l_exp_hash); + unlock_res_and_lock(new_lock); LDLM_LOCK_PUT(new_lock); lh->mlh_reg_lh.cookie = 0; @@ -2749,7 +2745,7 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info, struct obd_export *exp = req->rq_export; struct lustre_handle remote_hdl; struct ldlm_request *dlmreq; - struct list_head *iter; + struct ldlm_lock *lock; if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) return; @@ -2757,27 +2753,24 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info, dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ); remote_hdl = dlmreq->lock_handle[0]; - spin_lock(&exp->exp_ldlm_data.led_lock); - list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { - struct ldlm_lock *lock; - lock = list_entry(iter, struct ldlm_lock, l_export_chain); - if (lock == new_lock) - continue; - if (lock->l_remote_handle.cookie == remote_hdl.cookie) { + lock = lustre_hash_lookup(exp->exp_lock_hash, &remote_hdl); + if (lock) { + if (lock != new_lock) { lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie; lh->mlh_reg_mode = lock->l_granted_mode; - LDLM_DEBUG(lock, "restoring lock cookie"); + LDLM_DEBUG(lock, "Restoring lock cookie"); DEBUG_REQ(D_DLMTRACE, req, "restoring lock cookie "LPX64, lh->mlh_reg_lh.cookie); if (old_lock) *old_lock = LDLM_LOCK_GET(lock); - spin_unlock(&exp->exp_ldlm_data.led_lock); + lh_put(exp->exp_lock_hash, &lock->l_exp_hash); return; } + + lh_put(exp->exp_lock_hash, &lock->l_exp_hash); } - spin_unlock(&exp->exp_ldlm_data.led_lock); /* * If the xid matches, then we know this is a resent request, and allow @@ -3040,12 +3033,6 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, if (it != NULL) { const struct ldlm_request *dlmreq; __u64 req_bits; -#if 0 - struct ldlm_lock *lock = *lockp; - - LDLM_DEBUG(lock, "intent policy opc: %s\n", - ldlm_it2str(it->opc)); -#endif rc = mdt_intent_opc(it->opc, info, lockp, flags); if (rc == 0) @@ -4482,6 +4469,7 @@ static int mdt_obd_disconnect(struct obd_export *exp) static int mdt_init_export(struct obd_export *exp) { struct mdt_export_data *med = &exp->exp_mdt_data; + int rc; ENTRY; CFS_INIT_LIST_HEAD(&med->med_open_head); @@ -4491,7 +4479,10 @@ static int mdt_init_export(struct obd_export *exp) spin_lock(&exp->exp_lock); exp->exp_connecting = 1; spin_unlock(&exp->exp_lock); - RETURN(0); + rc = ldlm_init_export(exp); + if (rc) + CERROR("Error %d while initializing export\n", rc); + RETURN(rc); } static int mdt_destroy_export(struct obd_export *export) @@ -4512,6 +4503,7 @@ static int mdt_destroy_export(struct obd_export *export) mdt_cleanup_idmap(med); target_destroy_export(export); + ldlm_destroy_export(export); if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid)) RETURN(0); diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index afb32553ed..3208572598 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -694,11 +694,17 @@ out: RETURN(0); } +static inline int mgs_init_export(struct obd_export *exp) +{ + return ldlm_init_export(exp); +} + static inline int mgs_destroy_export(struct obd_export *exp) { ENTRY; target_destroy_export(exp); + ldlm_destroy_export(exp); RETURN(0); } @@ -808,6 +814,7 @@ static struct obd_ops mgs_obd_ops = { .o_setup = mgs_setup, .o_precleanup = mgs_precleanup, .o_cleanup = mgs_cleanup, + .o_init_export = mgs_init_export, .o_destroy_export = mgs_destroy_export, .o_iocontrol = mgs_iocontrol, .o_llog_init = mgs_llog_init, diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 3eb7073b8b..2a92609c52 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -735,15 +735,12 @@ struct obd_export *class_new_export(struct obd_device *obd, return ERR_PTR(-ENOMEM); export->exp_conn_cnt = 0; + export->exp_lock_hash = NULL; atomic_set(&export->exp_refcount, 2); atomic_set(&export->exp_rpc_count, 0); export->exp_obd = obd; CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies); CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue); - /* XXX this should be in LDLM init */ - CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks); - spin_lock_init(&export->exp_ldlm_data.led_lock); - CFS_INIT_LIST_HEAD(&export->exp_handle.h_link); class_handle_hash(&export->exp_handle, export_handle_addref); export->exp_last_request_time = cfs_time_current_sec(); diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index cef248dca7..a0e09127f9 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -1352,6 +1352,16 @@ struct exp_uuid_cb_data { int *len; }; +static void +lprocfs_exp_rd_cb_data_init(struct exp_uuid_cb_data *cb_data, char *page, + int count, int *eof, int *len) +{ + cb_data->page = page; + cb_data->count = count; + cb_data->eof = eof; + cb_data->len = len; +} + void lprocfs_exp_print_uuid(void *obj, void *cb_data) { struct obd_export *exp = (struct obd_export *)obj; @@ -1373,17 +1383,46 @@ int lprocfs_exp_rd_uuid(char *page, char **start, off_t off, int count, *eof = 1; page[0] = '\0'; - LASSERT(obd != NULL); - - cb_data.page = page; - cb_data.count = count; - cb_data.eof = eof; - cb_data.len = &len; + lprocfs_exp_rd_cb_data_init(&cb_data, page, count, eof, &len); lustre_hash_for_each_key(obd->obd_nid_hash, &stats->nid, lprocfs_exp_print_uuid, &cb_data); return (*cb_data.len); } +void lprocfs_exp_print_hash(void *obj, void *cb_data) +{ + struct exp_uuid_cb_data *data = cb_data; + struct obd_export *exp = obj; + lustre_hash_t *lh; + + lh = exp->exp_lock_hash; + if (lh) { + if (!*data->len) + *data->len += lustre_hash_debug_header(data->page, + data->count); + + *data->len += lustre_hash_debug_str(lh, data->page + *data->len, + data->count); + } +} + +int lprocfs_exp_rd_hash(char *page, char **start, off_t off, int count, + int *eof, void *data) +{ + struct nid_stat *stats = (struct nid_stat *)data; + struct exp_uuid_cb_data cb_data; + struct obd_device *obd = stats->nid_obd; + int len = 0; + + *eof = 1; + page[0] = '\0'; + lprocfs_exp_rd_cb_data_init(&cb_data, page, count, eof, &len); + + lustre_hash_for_each_key(obd->obd_nid_hash, &stats->nid, + lprocfs_exp_print_hash, &cb_data); + return (*cb_data.len); +} + int lprocfs_nid_stats_clear_read(char *page, char **start, off_t off, int count, int *eof, void *data) { @@ -1504,6 +1543,11 @@ int lprocfs_exp_setup(struct obd_export *exp, lnet_nid_t *nid, int *newnid) if (rc) CWARN("Error adding the uuid file\n"); + rc = lprocfs_add_simple(tmp->nid_proc, "hash", + lprocfs_exp_rd_hash, NULL, tmp); + if (rc) + CWARN("Error adding the hash file\n"); + exp->exp_nid_stats = tmp; *newnid = 1; RETURN(rc); diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c index 11f0bef9cb..f90afb1ef6 100644 --- a/lustre/obdecho/echo.c +++ b/lustre/obdecho/echo.c @@ -97,11 +97,17 @@ static int echo_disconnect(struct obd_export *exp) return class_disconnect(exp); } +static int echo_init_export(struct obd_export *exp) +{ + return ldlm_init_export(exp); +} + static int echo_destroy_export(struct obd_export *exp) { ENTRY; target_destroy_export(exp); + ldlm_destroy_export(exp); RETURN(0); } @@ -539,6 +545,7 @@ static struct obd_ops echo_obd_ops = { .o_owner = THIS_MODULE, .o_connect = echo_connect, .o_disconnect = echo_disconnect, + .o_init_export = echo_init_export, .o_destroy_export = echo_destroy_export, .o_create = echo_create, .o_destroy = echo_destroy, diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index e60e2b5ef8..49f209e3f3 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -596,7 +596,7 @@ static int filter_init_export(struct obd_export *exp) exp->exp_connecting = 1; spin_unlock(&exp->exp_lock); - return 0; + return ldlm_init_export(exp); } static int filter_free_server_data(struct filter_obd *filter) @@ -2876,6 +2876,7 @@ static int filter_destroy_export(struct obd_export *exp) */ target_destroy_export(exp); + ldlm_destroy_export(exp); if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid)) RETURN(0); -- GitLab