Commit dcbb023c authored by Vitaly Fertman's avatar Vitaly Fertman Committed by Oleg Drokin
Browse files

LU-11276 ldlm: fix lock convert races



The blocking cb may be triggered in parallel and the convert logic
of the DOM lock must be ready that the cancel_bits could be already
zeroed by the first executor.

As there may be several blocking cb parallel executors and several
conversion callers, each requesting for different inode bits, setup
the following logic:
- the lock keeps the aggregated set of bits requested for cancelling
  by different parties, where 0 means the whole lock is to be
  cancelled, and where the CBPENDING flag means there is a canceling
  job pending;
- once completed, the cancel_bits are zeroed and the CBPENDING flag
  is dropped, meaning the next request will be a part of the next job;
- once a local lock is converted, its state is changed appropriately
  and no cleanup is left for the interpret time as the lock is ready
  for the next usage;
- as the lock is unlocked in a process of conversion and more bits
  may appear, check it and repeat appropriately;
- let just 1 conversion executor to work at a time, others are waiting
  similar to ldlm_cli_cancel();
- there are others who may want to cancel unused locks (cancel_lru,
  cancel_resource_local), consider CANCELING as a request to cancel
  the full lock independently of the cancel_bits;

Some cleanups are done:
- move the cache drop logic to the CANCELING part of the blocking cb
  from the BLOCKING one;
- remove the convert RPC interpret, as the lock cleanups are already
  done in advance; the convert RPC is re-sendable and an error means
  there is a serioes net problem;

Test-Parameters: testlist=racer,racer,racer
Signed-off-by: default avatarVitaly Fertman <c17818@cray.com>
Change-Id: I901de34241704ed801152f071cb7f610fe6f4bfe
Signed-off-by: default avatarMikhail Pershin <mpershin@whamcloud.com>
Reviewed-on: https://review.whamcloud.com/39854

Tested-by: default avatarjenkins <devops@whamcloud.com>
Reviewed-by: default avatarJames Simmons <jsimmons@infradead.org>
Tested-by: default avatarMaloo <maloo@whamcloud.com>
Reviewed-by: default avatarVitaly Fertman <vitaly.fertman@hpe.com>
Reviewed-by: default avatarOleg Drokin <green@whamcloud.com>
parent 583c105e
......@@ -669,7 +669,6 @@ enum ldlm_cancel_flags {
LCF_ASYNC = 0x1, /* Cancel locks asynchronously. */
LCF_LOCAL = 0x2, /* Cancel locks locally, not notifing server */
LCF_BL_AST = 0x4, /* Cancel LDLM_FL_BL_AST locks in the same RPC */
LCF_CONVERT = 0x8, /* Try to convert IBITS lock before cancel */
};
struct ldlm_flock {
......@@ -1695,7 +1694,9 @@ int ldlm_cli_enqueue_local(const struct lu_env *env,
void *data, __u32 lvb_len, enum lvb_type lvb_type,
const __u64 *client_cookie,
struct lustre_handle *lockh);
int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags);
int ldlm_cli_convert_req(struct ldlm_lock *lock, __u32 *flags, __u64 new_bits);
int ldlm_cli_convert(struct ldlm_lock *lock,
enum ldlm_cancel_flags cancel_flags);
int ldlm_cli_update_pool(struct ptlrpc_request *req);
int ldlm_cli_cancel(const struct lustre_handle *lockh,
enum ldlm_cancel_flags cancel_flags);
......@@ -1721,8 +1722,8 @@ int ldlm_cli_cancel_list(struct list_head *head, int count,
enum ldlm_cancel_flags flags);
int ldlm_inodebits_drop(struct ldlm_lock *lock, __u64 to_drop);
int ldlm_cli_dropbits(struct ldlm_lock *lock, __u64 drop_bits);
int ldlm_cli_dropbits_list(struct list_head *converts, __u64 drop_bits);
int ldlm_cli_inodebits_convert(struct ldlm_lock *lock,
enum ldlm_cancel_flags cancel_flags);
/** @} ldlm_cli_api */
......
......@@ -382,7 +382,10 @@ void ldlm_ibits_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
union ldlm_policy_data *lpolicy)
{
lpolicy->l_inodebits.bits = wpolicy->l_inodebits.bits;
lpolicy->l_inodebits.try_bits = wpolicy->l_inodebits.try_bits;
/**
* try_bits are to be handled outside of generic write_to_local due
* to different behavior on a server and client.
*/
}
void ldlm_ibits_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
......@@ -424,93 +427,103 @@ int ldlm_inodebits_drop(struct ldlm_lock *lock, __u64 to_drop)
EXPORT_SYMBOL(ldlm_inodebits_drop);
/* convert single lock */
int ldlm_cli_dropbits(struct ldlm_lock *lock, __u64 drop_bits)
int ldlm_cli_inodebits_convert(struct ldlm_lock *lock,
enum ldlm_cancel_flags cancel_flags)
{
struct lustre_handle lockh;
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
struct ldlm_lock_desc ld = { { 0 } };
__u64 drop_bits, new_bits;
__u32 flags = 0;
int rc;
ENTRY;
LASSERT(drop_bits);
LASSERT(!lock->l_readers && !lock->l_writers);
check_res_locked(lock->l_resource);
LDLM_DEBUG(lock, "client lock convert START");
/* Lock is being converted already */
if (ldlm_is_converting(lock)) {
if (!(cancel_flags & LCF_ASYNC)) {
struct l_wait_info lwi = { 0 };
ldlm_lock2handle(lock, &lockh);
lock_res_and_lock(lock);
/* check if all bits are blocked */
if (!(lock->l_policy_data.l_inodebits.bits & ~drop_bits)) {
unlock_res_and_lock(lock);
/* return error to continue with cancel */
GOTO(exit, rc = -EINVAL);
unlock_res_and_lock(lock);
l_wait_event(lock->l_waitq,
is_lock_converted(lock), &lwi);
lock_res_and_lock(lock);
}
RETURN(0);
}
/* check if no common bits, consider this as successful convert */
if (!(lock->l_policy_data.l_inodebits.bits & drop_bits)) {
unlock_res_and_lock(lock);
GOTO(exit, rc = 0);
}
/* lru_cancel may happen in parallel and call ldlm_cli_cancel_list()
* independently.
*/
if (ldlm_is_canceling(lock))
RETURN(-EINVAL);
/* check if there is race with cancel */
if (ldlm_is_canceling(lock) || ldlm_is_cancel(lock)) {
unlock_res_and_lock(lock);
GOTO(exit, rc = -EINVAL);
}
/* no need in only local convert */
if (lock->l_flags & (LDLM_FL_LOCAL_ONLY | LDLM_FL_CANCEL_ON_BLOCK))
RETURN(-EINVAL);
/* clear cbpending flag early, it is safe to match lock right after
* client convert because it is downgrade always.
*/
ldlm_clear_cbpending(lock);
ldlm_clear_bl_ast(lock);
drop_bits = lock->l_policy_data.l_inodebits.cancel_bits;
/* no cancel bits - means that caller needs full cancel */
if (drop_bits == 0)
RETURN(-EINVAL);
new_bits = lock->l_policy_data.l_inodebits.bits & ~drop_bits;
/* check if all lock bits are dropped, proceed with cancel */
if (!new_bits)
RETURN(-EINVAL);
/* check if no dropped bits, consider this as successful convert */
if (lock->l_policy_data.l_inodebits.bits == new_bits)
RETURN(0);
/* If lock is being converted already, check drop bits first */
if (ldlm_is_converting(lock)) {
/* raced lock convert, lock inodebits are remaining bits
* so check if they are conflicting with new convert or not.
*/
if (!(lock->l_policy_data.l_inodebits.bits & drop_bits)) {
unlock_res_and_lock(lock);
GOTO(exit, rc = 0);
}
/* Otherwise drop new conflicting bits in new convert */
}
ldlm_set_converting(lock);
/* from all bits of blocking lock leave only conflicting */
drop_bits &= lock->l_policy_data.l_inodebits.bits;
/* save them in cancel_bits, so l_blocking_ast will know
* which bits from the current lock were dropped. */
lock->l_policy_data.l_inodebits.cancel_bits = drop_bits;
/* Finally clear these bits in lock ibits */
ldlm_inodebits_drop(lock, drop_bits);
unlock_res_and_lock(lock);
/* Finally call cancel callback for remaining bits only.
* It is important to have converting flag during that
* so blocking_ast callback can distinguish convert from
* cancels.
*/
if (lock->l_blocking_ast)
lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
LDLM_CB_CANCELING);
ld.l_policy_data.l_inodebits.cancel_bits = drop_bits;
unlock_res_and_lock(lock);
lock->l_blocking_ast(lock, &ld, lock->l_ast_data, LDLM_CB_CANCELING);
/* now notify server about convert */
rc = ldlm_cli_convert(lock, &flags);
if (rc) {
lock_res_and_lock(lock);
if (ldlm_is_converting(lock)) {
ldlm_clear_converting(lock);
ldlm_set_cbpending(lock);
ldlm_set_bl_ast(lock);
}
unlock_res_and_lock(lock);
GOTO(exit, rc);
}
EXIT;
exit:
LDLM_DEBUG(lock, "client lock convert END");
return rc;
}
rc = ldlm_cli_convert_req(lock, &flags, new_bits);
lock_res_and_lock(lock);
if (rc)
GOTO(full_cancel, rc);
/* Finally clear these bits in lock ibits */
ldlm_inodebits_drop(lock, drop_bits);
/* Being locked again check if lock was canceled, it is important
* to do and don't drop cbpending below
*/
if (ldlm_is_canceling(lock))
GOTO(full_cancel, rc = -EINVAL);
/* also check again if more bits to be cancelled appeared */
if (drop_bits != lock->l_policy_data.l_inodebits.cancel_bits)
GOTO(clear_converting, rc = -EAGAIN);
/* clear cbpending flag early, it is safe to match lock right after
* client convert because it is downgrade always.
*/
ldlm_clear_cbpending(lock);
ldlm_clear_bl_ast(lock);
spin_lock(&ns->ns_lock);
if (list_empty(&lock->l_lru))
ldlm_lock_add_to_lru_nolock(lock);
spin_unlock(&ns->ns_lock);
/* the job is done, zero the cancel_bits. If more conflicts appear,
* it will result in another cycle of ldlm_cli_inodebits_convert().
*/
full_cancel:
lock->l_policy_data.l_inodebits.cancel_bits = 0;
clear_converting:
ldlm_clear_converting(lock);
RETURN(rc);
}
int ldlm_inodebits_alloc_lock(struct ldlm_lock *lock)
{
......
......@@ -194,6 +194,7 @@ int ldlm_bl_thread_wakeup(void);
void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
struct ldlm_lock_desc *ld, struct ldlm_lock *lock);
void ldlm_bl_desc2lock(const struct ldlm_lock_desc *ld, struct ldlm_lock *lock);
#ifdef HAVE_SERVER_SUPPORT
/* ldlm_plain.c */
......@@ -388,6 +389,17 @@ static inline bool is_bl_done(struct ldlm_lock *lock)
return bl_done;
}
static inline bool is_lock_converted(struct ldlm_lock *lock)
{
bool ret = 0;
lock_res_and_lock(lock);
ret = (lock->l_policy_data.l_inodebits.cancel_bits == 0);
unlock_res_and_lock(lock);
return ret;
}
typedef void (*ldlm_policy_wire_to_local_t)(const union ldlm_wire_policy_data *,
union ldlm_policy_data *);
typedef void (*ldlm_policy_local_to_wire_t)(const union ldlm_policy_data *,
......
......@@ -1295,6 +1295,9 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
&lock->l_policy_data);
if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
lock->l_req_extent = lock->l_policy_data.l_extent;
else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS)
lock->l_policy_data.l_inodebits.try_bits =
dlm_req->lock_desc.l_policy_data.l_inodebits.try_bits;
existing_lock:
if (flags & LDLM_FL_HAS_INTENT) {
......@@ -1515,6 +1518,8 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
struct obd_export *exp = req->rq_export;
struct ldlm_reply *dlm_rep;
struct ldlm_lock *lock;
__u64 bits;
__u64 new_bits;
int rc;
ENTRY;
......@@ -1531,60 +1536,61 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
dlm_rep->lock_flags = dlm_req->lock_flags;
lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
if (lock) {
__u64 bits;
__u64 new;
if (!lock) {
LDLM_DEBUG_NOLOCK("server lock is canceled already");
req->rq_status = ELDLM_NO_LOCK_DATA;
RETURN(0);
}
bits = lock->l_policy_data.l_inodebits.bits;
new = dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
LDLM_DEBUG(lock, "server-side convert handler START");
LDLM_DEBUG(lock, "server-side convert handler START");
if (ldlm_is_cancel(lock)) {
LDLM_ERROR(lock, "convert on canceled lock!");
rc = ELDLM_NO_LOCK_DATA;
} else if (dlm_req->lock_desc.l_req_mode !=
lock->l_granted_mode) {
LDLM_ERROR(lock, "lock mode differs!");
rc = ELDLM_NO_LOCK_DATA;
} else if (bits == new) {
/* This can be valid situation if CONVERT RPCs are
* re-ordered. Just finish silently */
LDLM_DEBUG(lock, "lock is converted already!");
rc = ELDLM_OK;
} else {
lock_res_and_lock(lock);
if (ldlm_is_waited(lock))
ldlm_del_waiting_lock(lock);
lock_res_and_lock(lock);
bits = lock->l_policy_data.l_inodebits.bits;
new_bits = dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
ldlm_clear_cbpending(lock);
lock->l_policy_data.l_inodebits.cancel_bits = 0;
ldlm_inodebits_drop(lock, bits & ~new);
if (ldlm_is_cancel(lock)) {
LDLM_DEBUG(lock, "convert on canceled lock!");
unlock_res_and_lock(lock);
GOTO(out_put, rc = ELDLM_NO_LOCK_DATA);
}
ldlm_clear_blocking_data(lock);
unlock_res_and_lock(lock);
if (dlm_req->lock_desc.l_req_mode != lock->l_granted_mode) {
LDLM_ERROR(lock, "lock mode differs!");
unlock_res_and_lock(lock);
GOTO(out_put, rc = -EPROTO);
}
ldlm_reprocess_all(lock->l_resource, NULL);
rc = ELDLM_OK;
}
if (bits == new_bits) {
/*
* This can be valid situation if CONVERT RPCs are
* re-ordered. Just finish silently
*/
LDLM_DEBUG(lock, "lock is converted already!");
unlock_res_and_lock(lock);
} else {
if (ldlm_is_waited(lock))
ldlm_del_waiting_lock(lock);
if (rc == ELDLM_OK) {
dlm_rep->lock_handle = lock->l_remote_handle;
ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
&dlm_rep->lock_desc.l_policy_data);
}
ldlm_clear_cbpending(lock);
lock->l_policy_data.l_inodebits.cancel_bits = 0;
ldlm_inodebits_drop(lock, bits & ~new_bits);
LDLM_DEBUG(lock, "server-side convert handler END, rc = %d",
rc);
LDLM_LOCK_PUT(lock);
} else {
rc = ELDLM_NO_LOCK_DATA;
LDLM_DEBUG_NOLOCK("server-side convert handler END, rc = %d",
rc);
ldlm_clear_blocking_data(lock);
unlock_res_and_lock(lock);
ldlm_reprocess_all(lock->l_resource, NULL);
}
dlm_rep->lock_handle = lock->l_remote_handle;
ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
&dlm_rep->lock_desc.l_policy_data);
rc = ELDLM_OK;
EXIT;
out_put:
LDLM_DEBUG(lock, "server-side convert handler END, rc = %d", rc);
LDLM_LOCK_PUT(lock);
req->rq_status = rc;
RETURN(0);
return 0;
}
/**
......@@ -1717,37 +1723,61 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
#endif /* HAVE_SERVER_SUPPORT */
/**
* Callback handler for receiving incoming blocking ASTs.
*
* This can only happen on client side.
* Server may pass additional information about blocking lock.
* For IBITS locks it is conflicting bits which can be used for
* lock convert instead of cancel.
*/
void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
void ldlm_bl_desc2lock(const struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
{
int do_ast;
ENTRY;
LDLM_DEBUG(lock, "client blocking AST callback handler");
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
lock_res_and_lock(lock);
/* set bits to cancel for this lock for possible lock convert */
if (ns_is_client(ns) && (lock->l_resource->lr_type == LDLM_IBITS)) {
/* Lock description contains policy of blocking lock,
check_res_locked(lock->l_resource);
if (ns_is_client(ns) && ld &&
(lock->l_resource->lr_type == LDLM_IBITS)) {
/*
* Lock description contains policy of blocking lock,
* and its cancel_bits is used to pass conflicting bits.
* NOTE: ld can be NULL or can be not NULL but zeroed if
* passed from ldlm_bl_thread_blwi(), check below used bits
* in ld to make sure it is valid description.
*/
if (ld && ld->l_policy_data.l_inodebits.bits)
lock->l_policy_data.l_inodebits.cancel_bits =
if (ld->l_policy_data.l_inodebits.cancel_bits &&
ldlm_res_eq(&ld->l_resource.lr_name,
&lock->l_resource->lr_name) &&
!(ldlm_is_cbpending(lock) &&
lock->l_policy_data.l_inodebits.cancel_bits == 0)) {
/* always combine conflicting ibits */
lock->l_policy_data.l_inodebits.cancel_bits |=
ld->l_policy_data.l_inodebits.cancel_bits;
/* if there is no valid ld and lock is cbpending already
* then cancel_bits should be kept, otherwise it is zeroed.
*/
else if (!ldlm_is_cbpending(lock))
} else {
/* If cancel_bits are not obtained or
* if the lock is already CBPENDING and
* has no cancel_bits set
* - the full lock is to be cancelled
*/
lock->l_policy_data.l_inodebits.cancel_bits = 0;
}
}
}
/**
* Callback handler for receiving incoming blocking ASTs.
*
* This can only happen on client side.
*/
void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
{
int do_ast;
ENTRY;
LDLM_DEBUG(lock, "client blocking AST callback handler");
lock_res_and_lock(lock);
/* get extra information from desc if any */
ldlm_bl_desc2lock(ld, lock);
ldlm_set_cbpending(lock);
if (ldlm_is_cancel_on_block(lock))
......@@ -1862,10 +1892,13 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
ldlm_resource_unlink_lock(lock);
if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
/* BL_AST locks are not needed in LRU.
* Let ldlm_cancel_lru() be fast. */
ldlm_lock_remove_from_lru(lock);
if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
/*
* BL_AST locks are not needed in LRU.
* Let ldlm_cancel_lru() be fast.
*/
ldlm_lock_remove_from_lru(lock);
ldlm_bl_desc2lock(&dlm_req->lock_desc, lock);
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
LDLM_DEBUG(lock, "completion AST includes blocking AST");
}
......@@ -1916,10 +1949,12 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
struct ldlm_request *dlm_req,
struct ldlm_lock *lock)
{
int rc = -ENOSYS;
ENTRY;
struct ldlm_lock_desc *ld = &dlm_req->lock_desc;
int rc = -ENOSYS;
ENTRY;
LDLM_DEBUG(lock, "client glimpse AST callback handler");
LDLM_DEBUG(lock, "client glimpse AST callback handler");
if (lock->l_glimpse_ast != NULL)
rc = lock->l_glimpse_ast(lock, req);
......@@ -1937,9 +1972,16 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
ktime_after(ktime_get(),
ktime_add(lock->l_last_used,
ktime_set(ns->ns_dirty_age_limit, 0)))) {
unlock_res_and_lock(lock);
if (ldlm_bl_to_thread_lock(ns, NULL, lock))
ldlm_handle_bl_callback(ns, NULL, lock);
unlock_res_and_lock(lock);
/* For MDS glimpse it is always DOM lock, set corresponding
* cancel_bits to perform lock convert if needed
*/
if (lock->l_resource->lr_type == LDLM_IBITS)
ld->l_policy_data.l_inodebits.cancel_bits =
MDS_INODELOCK_DOM;
if (ldlm_bl_to_thread_lock(ns, ld, lock))
ldlm_handle_bl_callback(ns, ld, lock);
EXIT;
return;
......
......@@ -702,6 +702,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
if ((*flags) & LDLM_FL_AST_SENT) {
lock_res_and_lock(lock);
ldlm_bl_desc2lock(&reply->lock_desc, lock);
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
unlock_res_and_lock(lock);
LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
......@@ -1054,120 +1055,6 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
}
EXPORT_SYMBOL(ldlm_cli_enqueue);
/**
* Client-side lock convert reply handling.
*
* Finish client lock converting, checks for concurrent converts
* and clear 'converting' flag so lock can be placed back into LRU.
*/
static int lock_convert_interpret(const struct lu_env *env,
struct ptlrpc_request *req,
struct ldlm_async_args *aa, int rc)
{
struct ldlm_lock *lock;
struct ldlm_reply *reply;
ENTRY;
lock = ldlm_handle2lock(&aa->lock_handle);
if (!lock) {
LDLM_DEBUG_NOLOCK("convert ACK for unknown local cookie %#llx",
aa->lock_handle.cookie);
RETURN(-ESTALE);
}
LDLM_DEBUG(lock, "CONVERTED lock:");
if (rc != ELDLM_OK)
GOTO(out, rc);
reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
if (reply == NULL)
GOTO(out, rc = -EPROTO);
if (reply->lock_handle.cookie != aa->lock_handle.cookie) {
LDLM_ERROR(lock, "convert ACK with wrong lock cookie %#llx"
" but cookie %#llx from server %s id %s\n",
aa->lock_handle.cookie, reply->lock_handle.cookie,
req->rq_export->exp_client_uuid.uuid,
libcfs_id2str(req->rq_peer));
GOTO(out, rc = ELDLM_NO_LOCK_DATA);
}
lock_res_and_lock(lock);
/* Lock convert is sent for any new bits to drop, the converting flag
* is dropped when ibits on server are the same as on client. Meanwhile
* that can be so that more later convert will be replied first with
* and clear converting flag, so in case of such race just exit here.
* if lock has no converting bits then */
if (!ldlm_is_converting(lock)) {
LDLM_DEBUG(lock, "convert ACK for lock without converting flag,"
" reply ibits %#llx",
reply->lock_desc.l_policy_data.l_inodebits.bits);
} else if (reply->lock_desc.l_policy_data.l_inodebits.bits !=
lock->l_policy_data.l_inodebits.bits) {
/* Compare server returned lock ibits and local lock ibits
* if they are the same we consider convertion is done,
* otherwise we have more converts inflight and keep
* converting flag.
*/
LDLM_DEBUG(lock, "convert ACK with ibits %#llx\n",
reply->lock_desc.l_policy_data.l_inodebits.bits);
} else {
ldlm_clear_converting(lock);
/* Concurrent BL AST may arrive and cause another convert
* or cancel so just do nothing here if bl_ast is set,
* finish with convert otherwise.
*/
if (!ldlm_is_bl_ast(lock)) {
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
/* Drop cancel_bits since there are no more converts
* and put lock into LRU if it is still not used and
* is not there yet.
*/
lock->l_policy_data.l_inodebits.cancel_bits = 0;
if (!lock->l_readers && !lock->l_writers &&
!ldlm_is_canceling(lock)) {
spin_lock(&ns->ns_lock);
/* there is check for list_empty() inside */
ldlm_lock_remove_from_lru_nolock(lock);
ldlm_lock_add_to_lru_nolock(lock);
spin_unlock(&ns->ns_lock);
}
}
}
unlock_res_and_lock(lock);
out:
if (rc) {
int flag;
lock_res_and_lock(lock);
if (ldlm_is_converting(lock)) {
ldlm_clear_converting(lock);
ldlm_set_cbpending(lock);
ldlm_set_bl_ast(lock);
lock->l_policy_data.l_inodebits.cancel_bits = 0;
}
unlock_res_and_lock(lock);
/* fallback to normal lock cancel. If rc means there is no