From 77d5d2d66424d928eaba198d68cd60d9c49e8e32 Mon Sep 17 00:00:00 2001 From: vitaly <vitaly> Date: Fri, 14 Sep 2007 18:46:13 +0000 Subject: [PATCH] Branch HEAD b=13563 i=adilger i=green 1) cancel lru locks in ldlm_cli_cancel() and send 1 batched cancel RPC; 2) pass the whole list of canceled locks to the async blocking thread, not lock-by-lock, and send cancels in batched cancel RPCs. --- lustre/ldlm/ldlm_internal.h | 7 +++- lustre/ldlm/ldlm_lock.c | 2 +- lustre/ldlm/ldlm_lockd.c | 62 +++++++++++++++++++--------- lustre/ldlm/ldlm_request.c | 82 +++++++++++++++++++++---------------- 4 files changed, 94 insertions(+), 59 deletions(-) diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 9b90f45ccf..d650df21ff 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -41,8 +41,11 @@ int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock); void ldlm_lock_destroy_nolock(struct ldlm_lock *lock); /* ldlm_lockd.c */ -int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct ldlm_lock *lock, int flags); +int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, + struct ldlm_lock *lock); +int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, + struct list_head *cancels, int count); + void ldlm_handle_bl_callback(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 766aebac98..9db42bedc1 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -585,7 +585,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) ldlm_lock_remove_from_lru(lock); unlock_res_and_lock(lock); if ((lock->l_flags & LDLM_FL_ATOMIC_CB) || - ldlm_bl_to_thread(ns, NULL, lock, 0) != 0) + ldlm_bl_to_thread_lock(ns, NULL, lock) != 0) ldlm_handle_bl_callback(ns, NULL, lock); } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT && !lock->l_readers && !lock->l_writers && diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 5b22f25dfe..56e4b734b2 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -97,7 +97,8 @@ struct ldlm_bl_work_item { struct ldlm_namespace *blwi_ns; struct ldlm_lock_desc blwi_ld; struct ldlm_lock *blwi_lock; - int blwi_flags; + struct list_head blwi_head; + int blwi_count; }; #ifdef __KERNEL__ @@ -1340,7 +1341,7 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req, cfs_time_add(lock->l_last_used, cfs_time_seconds(10)))) { unlock_res_and_lock(lock); - if (ldlm_bl_to_thread(ns, NULL, lock, 0)) + if (ldlm_bl_to_thread_lock(ns, NULL, lock)) ldlm_handle_bl_callback(ns, NULL, lock); EXIT; @@ -1362,14 +1363,18 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) return ptlrpc_reply(req); } -int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct ldlm_lock *lock, int flags) -{ #ifdef __KERNEL__ +static int ldlm_bl_to_thread(struct ldlm_namespace *ns, + struct ldlm_lock_desc *ld, struct ldlm_lock *lock, + struct list_head *cancels, int count) +{ struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; struct ldlm_bl_work_item *blwi; ENTRY; + if (cancels && count == 0) + RETURN(0); + OBD_ALLOC(blwi, sizeof(*blwi)); if (blwi == NULL) RETURN(-ENOMEM); @@ -1377,15 +1382,37 @@ int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, blwi->blwi_ns = ns; if (ld != NULL) blwi->blwi_ld = *ld; - blwi->blwi_lock = lock; - blwi->blwi_flags = flags; - + if (count) { + list_add(&blwi->blwi_head, cancels); + list_del_init(cancels); + blwi->blwi_count = count; + } else { + blwi->blwi_lock = lock; + } spin_lock(&blp->blp_lock); list_add_tail(&blwi->blwi_entry, &blp->blp_list); cfs_waitq_signal(&blp->blp_waitq); spin_unlock(&blp->blp_lock); RETURN(0); +} +#endif + +int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, + struct ldlm_lock *lock) +{ +#ifdef __KERNEL__ + RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0)); +#else + RETURN(-ENOSYS); +#endif +} + +int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, + struct list_head *cancels, int count) +{ +#ifdef __KERNEL__ + RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count)); #else RETURN(-ENOSYS); #endif @@ -1534,7 +1561,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) CDEBUG(D_INODE, "blocking ast\n"); if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) ldlm_callback_reply(req, 0); - if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock, 0)) + if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock)) ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock); break; case LDLM_CP_CALLBACK: @@ -1712,18 +1739,13 @@ static int ldlm_bl_thread_main(void *arg) if (blwi->blwi_ns == NULL) break; - if (blwi->blwi_flags == LDLM_FL_CANCELING) { + if (blwi->blwi_count) { /* The special case when we cancel locks in lru - * asynchronously, then we first remove the lock from - * l_bl_ast explicitely in ldlm_cancel_lru before - * sending it to this thread. Thus lock is marked - * LDLM_FL_CANCELING, and already cancelled locally. */ - CFS_LIST_HEAD(head); - LASSERT(list_empty(&blwi->blwi_lock->l_bl_ast)); - list_add(&blwi->blwi_lock->l_bl_ast, &head); - ldlm_cli_cancel_req(blwi->blwi_lock->l_conn_export, - &head, 1, 0); - LDLM_LOCK_PUT(blwi->blwi_lock); + * asynchronously, we pass the list of locks here. + * Thus lock is marked LDLM_FL_CANCELING, and already + * canceled locally. */ + ldlm_cli_cancel_list(&blwi->blwi_head, + blwi->blwi_count, NULL, 0, 0); } else { ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, blwi->blwi_lock); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 25b742b606..08281b01eb 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -473,18 +473,28 @@ cleanup: /* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into * a single page on the send/receive side. XXX: 512 should be changed * to more adequate value. */ -#define ldlm_req_handles_avail(exp, size, bufcount, off) \ -({ \ - int _avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); \ - int _s = size[DLM_LOCKREQ_OFF]; \ - size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); \ - _avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, \ - bufcount, size); \ - _avail /= sizeof(struct lustre_handle); \ - _avail += LDLM_LOCKREQ_HANDLES - off; \ - size[DLM_LOCKREQ_OFF] = _s; \ - _avail; \ -}) +static inline int ldlm_req_handles_avail(struct obd_export *exp, + int *size, int bufcount, int off) +{ + int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); + int old_size = size[DLM_LOCKREQ_OFF]; + + size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); + avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, + bufcount, size); + avail /= sizeof(struct lustre_handle); + avail += LDLM_LOCKREQ_HANDLES - off; + size[DLM_LOCKREQ_OFF] = old_size; + + return avail; +} + +static inline int ldlm_cancel_handles_avail(struct obd_export *exp) +{ + int size[2] = { sizeof(struct ptlrpc_body), + sizeof(struct ldlm_request) }; + return ldlm_req_handles_avail(exp, size, 2, 0); +} /* Cancel lru locks and pack them into the enqueue request. Pack there the given * @count locks in @cancels. */ @@ -912,6 +922,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels, "out of sync -- not fatal\n", libcfs_nid2str(req->rq_import-> imp_connection->c_peer.nid)); + rc = 0; } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/ req->rq_import_generation == imp->imp_generation) { ptlrpc_req_finished(req); @@ -934,7 +945,7 @@ out: int ldlm_cli_cancel(struct lustre_handle *lockh) { struct ldlm_lock *lock; - CFS_LIST_HEAD(head); + CFS_LIST_HEAD(cancels); int rc = 0; ENTRY; @@ -946,15 +957,22 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) } rc = ldlm_cli_cancel_local(lock); - if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) - GOTO(out, rc); - - list_add(&lock->l_bl_ast, &head); - rc = ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0); - EXIT; -out: - LDLM_LOCK_PUT(lock); - return rc < 0 ? rc : 0; + list_add(&lock->l_bl_ast, &cancels); + + if (rc == LDLM_FL_BL_AST) { + rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1, 0); + } else if (rc == LDLM_FL_CANCELING) { + int avail = ldlm_cancel_handles_avail(lock->l_conn_export); + int count = 1; + LASSERT(avail > 0); + count += ldlm_cancel_lru_local(lock->l_resource->lr_namespace, + &cancels, 0, avail - 1, + LDLM_CANCEL_AGED); + ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0); + } + if (rc != LDLM_FL_CANCELING) + LDLM_LOCK_PUT(lock); + RETURN(rc < 0 ? rc : 0); } /* - Free space in lru for @count new locks, @@ -1087,21 +1105,14 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) #endif count = ldlm_cancel_lru_local(ns, &cancels, 0, 0, 0); if (sync == LDLM_ASYNC) { - struct ldlm_lock *lock, *next; - list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) { - /* Remove from the list to allow blocking thread to - * re-use l_bl_ast. */ - list_del_init(&lock->l_bl_ast); - rc = ldlm_bl_to_thread(ns, NULL, lock, - LDLM_FL_CANCELING); - if (rc) - list_add_tail(&lock->l_bl_ast, &next->l_bl_ast); - } + rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count); + if (rc == 0) + RETURN(count); } - /* If some locks are left in the list in ASYNC mode, or + /* If an error occured in ASYNC mode, or * this is SYNC mode, cancel the list. */ - ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF, 0); + ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0); RETURN(0); } @@ -1264,8 +1275,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, count = ldlm_cancel_resource_local(res, &cancels, policy, mode, 0, flags, opaque); - rc = ldlm_cli_cancel_list(&cancels, count, NULL, - DLM_LOCKREQ_OFF, flags); + rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0, flags); if (rc != ELDLM_OK) CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc); -- GitLab