From 02c7db2fcd335cf002fe2bf4d24e585a6438cf29 Mon Sep 17 00:00:00 2001 From: shadow <shadow> Date: Thu, 4 Oct 2007 06:34:34 +0000 Subject: [PATCH] revert fix from bug 13563, because it produce oops in tests 24o and 51. b=13723 --- lustre/ldlm/ldlm_internal.h | 7 +-- lustre/ldlm/ldlm_lock.c | 2 +- lustre/ldlm/ldlm_lockd.c | 63 +++++++++------------------ lustre/ldlm/ldlm_request.c | 85 +++++++++++++++++-------------------- 4 files changed, 62 insertions(+), 95 deletions(-) diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 2c5335c5e0..440a21e857 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -87,11 +87,8 @@ void ldlm_lock_touch_in_lru(struct ldlm_lock *lock); void ldlm_lock_destroy_nolock(struct ldlm_lock *lock); /* ldlm_lockd.c */ -int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct ldlm_lock *lock); -int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct list_head *cancels, int count); - +int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, + struct ldlm_lock *lock, int flags); void ldlm_handle_bl_callback(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index d885fd4b76..4e62e68e9a 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -630,7 +630,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) ldlm_lock_remove_from_lru(lock); unlock_res_and_lock(lock); if ((lock->l_flags & LDLM_FL_ATOMIC_CB) || - ldlm_bl_to_thread_lock(ns, NULL, lock) != 0) + ldlm_bl_to_thread(ns, NULL, lock, 0) != 0) ldlm_handle_bl_callback(ns, NULL, lock); } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT && !lock->l_readers && !lock->l_writers && diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index f55e74d59f..39c37ef5f6 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -94,8 +94,7 @@ struct ldlm_bl_work_item { struct ldlm_namespace *blwi_ns; struct ldlm_lock_desc blwi_ld; struct ldlm_lock *blwi_lock; - struct list_head blwi_head; - int blwi_count; + int blwi_flags; }; #ifdef __KERNEL__ @@ -1336,7 +1335,7 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req, cfs_time_add(lock->l_last_used, cfs_time_seconds(10)))) { unlock_res_and_lock(lock); - if (ldlm_bl_to_thread_lock(ns, NULL, lock)) + if (ldlm_bl_to_thread(ns, NULL, lock, 0)) ldlm_handle_bl_callback(ns, NULL, lock); EXIT; @@ -1357,19 +1356,14 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) } return ptlrpc_reply(req); } - -#ifdef __KERNEL__ -static int ldlm_bl_to_thread(struct ldlm_namespace *ns, - struct ldlm_lock_desc *ld, struct ldlm_lock *lock, - struct list_head *cancels, int count) +int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, + struct ldlm_lock *lock, int flags) { +#ifdef __KERNEL__ struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; struct ldlm_bl_work_item *blwi; ENTRY; - if (cancels && count == 0) - RETURN(0); - OBD_ALLOC(blwi, sizeof(*blwi)); if (blwi == NULL) RETURN(-ENOMEM); @@ -1377,37 +1371,15 @@ static int ldlm_bl_to_thread(struct ldlm_namespace *ns, blwi->blwi_ns = ns; if (ld != NULL) blwi->blwi_ld = *ld; - if (count) { - list_add(&blwi->blwi_head, cancels); - list_del_init(cancels); - blwi->blwi_count = count; - } else { - blwi->blwi_lock = lock; - } + blwi->blwi_lock = lock; + blwi->blwi_flags = flags; + spin_lock(&blp->blp_lock); list_add_tail(&blwi->blwi_entry, &blp->blp_list); cfs_waitq_signal(&blp->blp_waitq); spin_unlock(&blp->blp_lock); RETURN(0); -} -#endif - -int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct ldlm_lock *lock) -{ -#ifdef __KERNEL__ - RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0)); -#else - RETURN(-ENOSYS); -#endif -} - -int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct list_head *cancels, int count) -{ -#ifdef __KERNEL__ - RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count)); #else RETURN(-ENOSYS); #endif @@ -1556,7 +1528,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) CDEBUG(D_INODE, "blocking ast\n"); if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) ldlm_callback_reply(req, 0); - if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock)) + if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock, 0)) ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock); break; case LDLM_CP_CALLBACK: @@ -1676,13 +1648,18 @@ static int ldlm_bl_thread_main(void *arg) if (blwi->blwi_ns == NULL) break; - if (blwi->blwi_count) { + if (blwi->blwi_flags == LDLM_FL_CANCELING) { /* The special case when we cancel locks in lru - * asynchronously, we pass the list of locks here. - * Thus lock is marked LDLM_FL_CANCELING, and already - * canceled locally. */ - ldlm_cli_cancel_list(&blwi->blwi_head, - blwi->blwi_count, NULL, 0); + * asynchronously, then we first remove the lock from + * l_bl_ast explicitely in ldlm_cancel_lru before + * sending it to this thread. Thus lock is marked + * LDLM_FL_CANCELING, and already cancelled locally. */ + CFS_LIST_HEAD(head); + LASSERT(list_empty(&blwi->blwi_lock->l_bl_ast)); + list_add(&blwi->blwi_lock->l_bl_ast, &head); + ldlm_cli_cancel_req(blwi->blwi_lock->l_conn_export, + &head, 1); + LDLM_LOCK_PUT(blwi->blwi_lock); } else { ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, blwi->blwi_lock); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index dd03ac1656..cb8c6e4e5d 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -469,28 +469,18 @@ cleanup: /* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into * a single page on the send/receive side. XXX: 512 should be changed * to more adequate value. */ -static inline int ldlm_req_handles_avail(struct obd_export *exp, - int *size, int bufcount, int off) -{ - int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); - int old_size = size[DLM_LOCKREQ_OFF]; - - size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); - avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, - bufcount, size); - avail /= sizeof(struct lustre_handle); - avail += LDLM_LOCKREQ_HANDLES - off; - size[DLM_LOCKREQ_OFF] = old_size; - - return avail; -} - -static inline int ldlm_cancel_handles_avail(struct obd_export *exp) -{ - int size[2] = { sizeof(struct ptlrpc_body), - sizeof(struct ldlm_request) }; - return ldlm_req_handles_avail(exp, size, 2, 0); -} +#define ldlm_req_handles_avail(exp, size, bufcount, off) \ +({ \ + int _avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); \ + int _s = size[DLM_LOCKREQ_OFF]; \ + size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); \ + _avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, \ + bufcount, size); \ + _avail /= sizeof(struct lustre_handle); \ + _avail += LDLM_LOCKREQ_HANDLES - off; \ + size[DLM_LOCKREQ_OFF] = _s; \ + _avail; \ +}) /* Cancel lru locks and pack them into the enqueue request. Pack there the given * @count locks in @cancels. */ @@ -511,14 +501,15 @@ struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp, /* Estimate the amount of free space in the request. */ int avail = ldlm_req_handles_avail(exp, size, bufcount, LDLM_ENQUEUE_CANCEL_OFF); - int to_free = exp_connect_lru_resize(exp) ? 0 : 1; LASSERT(avail >= count); + /* Cancel lru locks here _only_ if the server supports * EARLY_CANCEL. Otherwise we have to send extra CANCEL * rpc right on enqueue, what will make it slower, vs. * asynchronous rpc in blocking thread. */ - count += ldlm_cancel_lru_local(ns, cancels, to_free, + count += ldlm_cancel_lru_local(ns, cancels, + exp_connect_lru_resize(exp) ? 0 : 1, avail - count, LDLM_CANCEL_AGED); size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_ENQUEUE); @@ -913,7 +904,6 @@ int ldlm_cli_cancel_req(struct obd_export *exp, "out of sync -- not fatal\n", libcfs_nid2str(req->rq_import-> imp_connection->c_peer.nid)); - rc = 0; } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/ req->rq_import_generation == imp->imp_generation) { ptlrpc_req_finished(req); @@ -992,25 +982,20 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) } rc = ldlm_cli_cancel_local(lock); + + if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) + GOTO(out, rc); + list_add(&lock->l_bl_ast, &cancels); + rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1); + EXIT; + out: + LDLM_LOCK_PUT(lock); + return rc < 0 ? rc : 0; - if (rc == LDLM_FL_BL_AST) { - rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1); - } else if (rc == LDLM_FL_CANCELING) { - int avail = ldlm_cancel_handles_avail(lock->l_conn_export); - int count = 1; - LASSERT(avail > 0); - count += ldlm_cancel_lru_local(lock->l_resource->lr_namespace, - &cancels, 0, avail - 1, - LDLM_CANCEL_AGED); - ldlm_cli_cancel_list(&cancels, count, NULL, 0); - } - if (rc != LDLM_FL_CANCELING) - LDLM_LOCK_PUT(lock); - RETURN(rc < 0 ? rc : 0); } -/* - Free space in lru for @count new locks, +/* - Free space in lru for @count new locks, * redundant unused locks are canceled locally; * - also cancel locally unused aged locks; * - do not cancel more than @max locks; @@ -1176,13 +1161,21 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync) #endif count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0); if (sync == LDLM_ASYNC) { - rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count); - if (rc == 0) - RETURN(count); + struct ldlm_lock *lock, *next; + list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) { + /* Remove from the list to allow blocking thread to + * re-use l_bl_ast. */ + list_del_init(&lock->l_bl_ast); + rc = ldlm_bl_to_thread(ns, NULL, lock, + LDLM_FL_CANCELING); + if (rc) + list_add_tail(&lock->l_bl_ast, &next->l_bl_ast); + } } - /* If an error occured in ASYNC mode, or + + /* If some locks are left in the list in ASYNC mode, or * this is SYNC mode, cancel the list. */ - ldlm_cli_cancel_list(&cancels, count, NULL, 0); + ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF); RETURN(count); } @@ -1338,7 +1331,7 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, count = ldlm_cancel_resource_local(res, &cancels, NULL, LCK_MINMODE, 0, flags, opaque); - rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0); + rc = ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF); if (rc != ELDLM_OK) CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc); -- GitLab