diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index a0aae86dfa05ffc41ca90247c1781c92edd4bde2..8f8fe8d65337a20eb060978e848a6667783e12b5 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -37,35 +37,42 @@ typedef enum { #define LDLM_NAMESPACE_SERVER 0 #define LDLM_NAMESPACE_CLIENT 1 -#define LDLM_FL_LOCK_CHANGED (1 << 0) /* extent, mode, or resource changed */ +#define LDLM_FL_LOCK_CHANGED 0x000001 /* extent, mode, or resource changed */ /* If the server returns one of these flags, then the lock was put on that list. * If the client sends one of these flags (during recovery ONLY!), it wants the * lock added to the specified list, no questions asked. -p */ -#define LDLM_FL_BLOCK_GRANTED (1 << 1) -#define LDLM_FL_BLOCK_CONV (1 << 2) -#define LDLM_FL_BLOCK_WAIT (1 << 3) +#define LDLM_FL_BLOCK_GRANTED 0x000002 +#define LDLM_FL_BLOCK_CONV 0x000004 +#define LDLM_FL_BLOCK_WAIT 0x000008 -#define LDLM_FL_CBPENDING (1 << 4) // this lock is being destroyed -#define LDLM_FL_AST_SENT (1 << 5) // blocking or cancel packet was sent -#define LDLM_FL_WAIT_NOREPROC (1 << 6)// not a real lock flag,not saved in lock -#define LDLM_FL_CANCEL (1 << 7) // cancellation callback already run +#define LDLM_FL_CBPENDING 0x000010 /* this lock is being destroyed */ +#define LDLM_FL_AST_SENT 0x000020 /* blocking or cancel packet was sent */ +#define LDLM_FL_WAIT_NOREPROC 0x000040 /* not a real flag, not saved in lock */ +#define LDLM_FL_CANCEL 0x000080 /* cancellation callback already run */ /* Lock is being replayed. This could probably be implied by the fact that one * of BLOCK_{GRANTED,CONV,WAIT} is set, but that is pretty dangerous. */ -#define LDLM_FL_REPLAY (1 << 8) +#define LDLM_FL_REPLAY 0x000100 -#define LDLM_FL_INTENT_ONLY (1 << 9) /* don't grant lock, just do intent */ -#define LDLM_FL_LOCAL_ONLY (1 << 10) /* see ldlm_cli_cancel_unused */ +#define LDLM_FL_INTENT_ONLY 0x000200 /* don't grant lock, just do intent */ +#define LDLM_FL_LOCAL_ONLY 0x000400 /* see ldlm_cli_cancel_unused */ /* don't run the cancel callback under ldlm_cli_cancel_unused */ -#define LDLM_FL_NO_CALLBACK (1 << 11) +#define LDLM_FL_NO_CALLBACK 0x000800 -#define LDLM_FL_HAS_INTENT (1 << 12) /* lock request has intent */ -#define LDLM_FL_CANCELING (1 << 13) /* lock cancel has already been sent */ -#define LDLM_FL_LOCAL (1 << 14) // a local lock (ie, no srv/cli split) -#define LDLM_FL_WARN (1 << 15) /* see ldlm_cli_cancel_unused */ -#define LDLM_FL_MATCH_DATA (1 << 16) /* see ldlm_lock_match */ +#define LDLM_FL_HAS_INTENT 0x001000 /* lock request has intent */ +#define LDLM_FL_CANCELING 0x002000 /* lock cancel has already been sent */ +#define LDLM_FL_LOCAL 0x004000 /* local lock (ie, no srv/cli split) */ +#define LDLM_FL_WARN 0x008000 /* see ldlm_cli_cancel_unused */ +#define LDLM_FL_DISCARD_DATA 0x010000 /* discard (no writeback) on cancel */ +#define LDLM_FL_MATCH_DATA 0x020000 /* see ldlm_lock_match */ + +/* These are flags that are mapped into the flags and ASTs of blocking locks */ +#define LDLM_AST_DISCARD_DATA 0x80000000 /* Add FL_DISCARD to blocking ASTs */ + +/* Flags sent in AST lock_flags to be mapped into the receiving lock. */ +#define LDLM_AST_FLAGS (LDLM_FL_DISCARD_DATA) /* The blocking callback is overloaded to perform two functions. These flags * indicate which operation should be performed. */ @@ -248,20 +255,20 @@ do { \ if (lock->l_resource == NULL) { \ CDEBUG(level, "### " format \ " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "\ - "res: \?\? rrc=\?\? type: \?\?\? remote: " \ + "res: \?\? rrc=\?\? type: \?\?\? flags: %x remote: " \ LPX64"\n" , ## a, lock, lock->l_handle.h_cookie, \ atomic_read(&lock->l_refc), \ lock->l_readers, lock->l_writers, \ ldlm_lockname[lock->l_granted_mode], \ ldlm_lockname[lock->l_req_mode], \ - lock->l_remote_handle.cookie); \ + lock->l_flags, lock->l_remote_handle.cookie); \ break; \ } \ if (lock->l_resource->lr_type == LDLM_EXTENT) { \ CDEBUG(level, "### " format \ " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " \ "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64\ - "] remote: "LPX64"\n" , ## a, \ + "] flags: %x remote: "LPX64"\n" , ## a, \ lock->l_resource->lr_namespace->ns_name, lock, \ lock->l_handle.h_cookie, atomic_read(&lock->l_refc), \ lock->l_readers, lock->l_writers, \ @@ -272,14 +279,14 @@ do { \ atomic_read(&lock->l_resource->lr_refcount), \ ldlm_typename[lock->l_resource->lr_type], \ lock->l_extent.start, lock->l_extent.end, \ - lock->l_remote_handle.cookie); \ + lock->l_flags, lock->l_remote_handle.cookie); \ break; \ } \ { \ CDEBUG(level, "### " format \ " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " \ - "res: "LPU64"/"LPU64" rrc: %d type: %s remote: "LPX64 \ - "\n" , ## a, \ + "res: "LPU64"/"LPU64" rrc: %d type: %s flags: %x " \ + "remote: "LPX64"\n" , ## a, \ lock->l_resource->lr_namespace->ns_name, \ lock, lock->l_handle.h_cookie, \ atomic_read (&lock->l_refc), \ @@ -290,7 +297,7 @@ do { \ lock->l_resource->lr_name.name[1], \ atomic_read(&lock->l_resource->lr_refcount), \ ldlm_typename[lock->l_resource->lr_type], \ - lock->l_remote_handle.cookie); \ + lock->l_flags, lock->l_remote_handle.cookie); \ } \ } while (0) @@ -306,7 +313,7 @@ do { \ */ #define LDLM_ITER_CONTINUE 1 /* keep iterating */ -#define LDLM_ITER_STOP 0 /* stop iterating */ +#define LDLM_ITER_STOP 2 /* stop iterating */ typedef int (*ldlm_iterator_t)(struct ldlm_lock *, void *); typedef int (*ldlm_res_iterator_t)(struct ldlm_resource *, void *); @@ -374,18 +381,9 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode); void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode); -void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen); int ldlm_lock_match(struct ldlm_namespace *ns, int flags, struct ldlm_res_id *, __u32 type, void *cookie, int cookielen, ldlm_mode_t mode, void *data, struct lustre_handle *); -struct ldlm_lock * -ldlm_lock_create(struct ldlm_namespace *ns, - struct lustre_handle *parent_lock_handle, struct ldlm_res_id, - __u32 type, ldlm_mode_t, ldlm_blocking_callback, - void *data); -ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **, - void *cookie, int cookie_len, int *flags, - ldlm_completion_callback completion); struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, int *flags); void ldlm_lock_cancel(struct ldlm_lock *lock); diff --git a/lustre/ldlm/l_lock.c b/lustre/ldlm/l_lock.c index c439eed9b41888a2257afed51fc9d328307f1b3c..ebf49d6f246056507097c80f893f4b5eee024ddf 100644 --- a/lustre/ldlm/l_lock.c +++ b/lustre/ldlm/l_lock.c @@ -46,7 +46,6 @@ #endif #include <linux/lustre_dlm.h> -#include <linux/obd_class.h> #include <linux/lustre_lib.h> /* invariants: diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index f6a9f5e775557ffe39d03b08237039edd8a45544..08ee7faeb626bc531ebf2111b0d8ce846cca43f4 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -40,6 +40,8 @@ int ldlm_extent_compat(struct ldlm_lock *a, struct ldlm_lock *b) RETURN(1); } +#include "ldlm_internal.h" + /* The purpose of this function is to return: * - the maximum extent * - containing the requested extent diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index b8bfdac4474179c5b1261bc46a6cd48de36fa747..5aca55fdec46f8c6a16e8b5a77445d27f4c91652 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -1 +1,13 @@ +/* ldlm_request.c */ int ldlm_cancel_lru(struct ldlm_namespace *ns); + +/* ldlm_lock.c */ +void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen, + int run_ast); +struct ldlm_lock * +ldlm_lock_create(struct ldlm_namespace *ns, + struct lustre_handle *parent_lock_handle, struct ldlm_res_id, + __u32 type, ldlm_mode_t, ldlm_blocking_callback, + ldlm_completion_callback, void *data); +ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **, + void *cookie, int cookie_len, int *flags); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 4b056557e8490a9a27c3d78a0b3bb6f8b17ba7c0..a1b5faad8755e6f158ef50a47b67a1a9da146dc4 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -27,7 +27,6 @@ # include <linux/slab.h> # include <linux/module.h> # include <linux/lustre_dlm.h> -# include <linux/lustre_mds.h> #else # include <liblustre.h> # include <linux/kp30.h> @@ -410,6 +409,8 @@ static void ldlm_add_ast_work_item(struct ldlm_lock *lock, if (new && (lock->l_flags & LDLM_FL_AST_SENT)) GOTO(out, 0); + CDEBUG(D_OTHER, "lock %p incompatible; sending blocking AST.\n", lock); + OBD_ALLOC(w, sizeof(*w)); if (!w) { LBUG(); @@ -565,11 +566,8 @@ static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs, rc = 0; - if (send_cbs && child->l_blocking_ast != NULL) { - CDEBUG(D_OTHER, "lock %p incompatible; sending " - "blocking AST.\n", child); + if (send_cbs && child->l_blocking_ast != NULL) ldlm_add_ast_work_item(child, lock, NULL, 0); - } } return rc; @@ -597,7 +595,8 @@ static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs) * - ldlm_reprocess_queue * - ldlm_lock_convert */ -void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen) +void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen, + int run_ast) { struct ldlm_resource *res = lock->l_resource; ENTRY; @@ -609,7 +608,7 @@ void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen) if (lock->l_granted_mode < res->lr_most_restr) res->lr_most_restr = lock->l_granted_mode; - if (lock->l_completion_ast != NULL) + if (run_ast && lock->l_completion_ast != NULL) ldlm_add_ast_work_item(lock, NULL, data, datalen); l_unlock(&lock->l_resource->lr_namespace->ns_lock); @@ -764,6 +763,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, struct ldlm_res_id res_id, __u32 type, ldlm_mode_t mode, ldlm_blocking_callback blocking, + ldlm_completion_callback completion, void *data) { struct ldlm_resource *res, *parent_res = NULL; @@ -791,15 +791,14 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, lock->l_req_mode = mode; lock->l_data = data; lock->l_blocking_ast = blocking; + lock->l_completion_ast = completion; RETURN(lock); } ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, struct ldlm_lock **lockp, - void *cookie, int cookie_len, - int *flags, - ldlm_completion_callback completion) + void *cookie, int cookie_len, int *flags) { struct ldlm_resource *res; struct ldlm_lock *lock = *lockp; @@ -863,7 +862,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); else - ldlm_grant_lock(lock, NULL, 0); + ldlm_grant_lock(lock, NULL, 0, 0); GOTO(out, ELDLM_OK); } else if (*flags & LDLM_FL_REPLAY) { if (*flags & LDLM_FL_BLOCK_CONV) { @@ -873,7 +872,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, ldlm_resource_add_lock(res, &res->lr_waiting, lock); GOTO(out, ELDLM_OK); } else if (*flags & LDLM_FL_BLOCK_GRANTED) { - ldlm_grant_lock(lock, NULL, 0); + ldlm_grant_lock(lock, NULL, 0, 0); GOTO(out, ELDLM_OK); } /* If no flags, fall through to normal enqueue path. */ @@ -895,12 +894,9 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, *flags |= LDLM_FL_BLOCK_GRANTED; GOTO(out, ELDLM_OK); } - ldlm_grant_lock(lock, NULL, 0); + ldlm_grant_lock(lock, NULL, 0, 0); EXIT; out: - /* Don't set 'completion_ast' until here so that if the lock is granted - * immediately we don't do an unnecessary completion call. */ - lock->l_completion_ast = completion; l_unlock(&ns->ns_lock); return ELDLM_OK; } @@ -922,7 +918,7 @@ static int ldlm_reprocess_queue(struct ldlm_resource *res, RETURN(1); list_del_init(&pending->l_res_link); - ldlm_grant_lock(pending, NULL, 0); + ldlm_grant_lock(pending, NULL, 0, 1); } RETURN(0); @@ -1034,6 +1030,9 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) struct ldlm_namespace *ns; ENTRY; + /* There's no race between calling this and taking the ns lock below; + * a lock can only be put on the waiting list once, because it can only + * issue a blocking AST once. */ ldlm_del_waiting_lock(lock); res = lock->l_resource; @@ -1118,7 +1117,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, LBUG(); res->lr_tmp = &rpc_list; - ldlm_grant_lock(lock, NULL, 0); + ldlm_grant_lock(lock, NULL, 0, 0); res->lr_tmp = NULL; granted = 1; /* FIXME: completion handling not with ns_lock held ! */ diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 016a471bccf1456b9d32c90f2a8f7bff6ff448fa..9350539e9d3bbdc285ff8d905423cda8afdfdf58 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -21,7 +21,9 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#define EXPORT_SYMTAB +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif #define DEBUG_SUBSYSTEM S_LDLM #ifdef __KERNEL__ @@ -35,6 +37,8 @@ #include <linux/lustre_dlm.h> #include <linux/obd_class.h> +#include "ldlm_internal.h" + extern kmem_cache_t *ldlm_resource_slab; extern kmem_cache_t *ldlm_lock_slab; extern struct lustre_lock ldlm_handle_lock; @@ -454,7 +458,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, dlm_req->lock_desc.l_resource.lr_name, dlm_req->lock_desc.l_resource.lr_type, dlm_req->lock_desc.l_req_mode, - blocking_callback, NULL); + blocking_callback, completion_callback, NULL); if (!lock) GOTO(out, err = -ENOMEM); @@ -471,7 +475,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, l_unlock(&lock->l_resource->lr_namespace->ns_lock); err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen, - &flags, completion_callback); + &flags); if (err) GOTO(out, err); @@ -665,7 +669,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, LDLM_DEBUG(lock, "completion AST, new resource"); } lock->l_resource->lr_tmp = &ast_list; - ldlm_grant_lock(lock, req, sizeof(*req)); + ldlm_grant_lock(lock, req, sizeof(*req), 1); lock->l_resource->lr_tmp = NULL; LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work"); l_unlock(&ns->ns_lock); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index aac4213cfb3eec7f53d0788a349e3f45c545db53..4c21bc676bdddccf0499ebac812b578aa9925d6d 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -29,6 +29,8 @@ #include <linux/obd_class.h> #include <linux/obd.h> +#include "ldlm_internal.h" + static void interrupted_completion_wait(void *data) { } @@ -146,7 +148,7 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, } lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, - blocking, data); + blocking, completion, data); if (!lock) GOTO(out_nolock, err = -ENOMEM); LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created"); @@ -155,8 +157,7 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, ldlm_lock2handle(lock, lockh); lock->l_flags |= LDLM_FL_LOCAL; - err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags, - completion); + err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags); if (err != ELDLM_OK) GOTO(out, err); @@ -218,14 +219,9 @@ int ldlm_cli_enqueue(struct lustre_handle *connh, LASSERT(connh == lock->l_connh); } else { lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, - mode, blocking, data); + mode, blocking, completion, data); if (lock == NULL) GOTO(out_nolock, rc = -ENOMEM); - /* ugh. I set this early (instead of waiting for _enqueue) - * because the completion AST might arrive early, and we need - * (in just this one case) to run the completion_cb even if it - * arrives before the reply. */ - lock->l_completion_ast = completion; LDLM_DEBUG(lock, "client-side enqueue START"); /* for the local lock, add the reference */ ldlm_lock_addref_internal(lock, mode); @@ -344,11 +340,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh, } if (!is_replay) { - l_lock(&ns->ns_lock); - lock->l_completion_ast = NULL; - rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags, - completion); - l_unlock(&ns->ns_lock); + rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags); if (lock->l_completion_ast) lock->l_completion_ast(lock, *flags, NULL); } @@ -813,7 +805,6 @@ void ldlm_change_cbdata(struct ldlm_namespace *ns, void *data) { struct ldlm_resource *res; - int rc = 0; ENTRY; if (ns == NULL) { @@ -828,7 +819,7 @@ void ldlm_change_cbdata(struct ldlm_namespace *ns, } l_lock(&ns->ns_lock); - rc = ldlm_resource_foreach(res, iter, data); + ldlm_resource_foreach(res, iter, data); l_unlock(&ns->ns_lock); ldlm_resource_putref(res); EXIT;