diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 9cd78771948335e24d0aa008f1bbafe979afe3a5..3eadc05f45afd21c2212bae30ecfd35624322b09 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -520,9 +520,10 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); #define OBD_CONNECT_QUOTA64 0x00080000ULL /* 64bit qunit_data.qd_count b=10707*/ #define OBD_CONNECT_MDS_CAPA 0x00100000ULL /* MDS capability */ #define OBD_CONNECT_OSS_CAPA 0x00200000ULL /* OSS capability */ -#define OBD_CONNECT_MDS_MDS 0x00400000ULL /* MDS-MDS connection*/ +#define OBD_CONNECT_CANCELSET 0x00400000ULL /* Early batched cancels. */ #define OBD_CONNECT_SOM 0x00800000ULL /* SOM feature */ -#define OBD_CONNECT_CANCELSET 0x01000000ULL /* Early batched cancels. */ +#define OBD_CONNECT_AT 0x01000000ULL /* client uses adaptive timeouts */ +#define OBD_CONNECT_MDS_MDS 0x02000000ULL /* MDS-MDS connection*/ #define OBD_CONNECT_REAL 0x00000200ULL /* real connection */ /* also update obd_connect_names[] for lprocfs_rd_connect_flags() * and lustre/utils/wirecheck.c */ diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index cbf2b596497d6c6229e28096daa725cc3ebf8e7e..8c7e4720504bebc2ff26cde61c6c0a8e62d97115 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -128,8 +128,21 @@ typedef enum { * w/o involving separate thread. in order to decrease cs rate */ #define LDLM_FL_ATOMIC_CB 0x4000000 +/* It may happen that a client initiate 2 operations, e.g. unlink and mkdir, + * such that server send blocking ast for conflict locks to this client for + * the 1st operation, whereas the 2nd operation has canceled this lock and + * is waiting for rpc_lock which is taken by the 1st operation. + * LDLM_FL_BL_AST is to be set by ldlm_callback_handler() to the lock not allow + * ELC code to cancel it. + * LDLM_FL_BL_DONE is to be set by ldlm_cancel_callback() when lock cache is + * droped to let ldlm_callback_handler() return EINVAL to the server. It is + * used when ELC rpc is already prepared and is waiting for rpc_lock, too late + * to send a separate CANCEL rpc. */ +#define LDLM_FL_BL_AST 0x10000000 +#define LDLM_FL_BL_DONE 0x20000000 + /* Cancel lock asynchronously. See ldlm_cli_cancel_unused_resource. */ -#define LDLM_FL_ASYNC 0x20000000 +#define LDLM_FL_ASYNC 0x40000000 /* The blocking callback is overloaded to perform two functions. These flags * indicate which operation should be performed. */ @@ -364,6 +377,16 @@ struct ldlm_ast_work { int w_datalen; }; +/* ldlm_enqueue parameters common */ +struct ldlm_enqueue_info { + __u32 ei_type; /* Type of the lock being enqueued. */ + __u32 ei_mode; /* Mode of the lock being enqueued. */ + void *ei_cb_bl; /* Different callbacks for lock handling (blocking, */ + void *ei_cb_cp; /* completion, glimpse) */ + void *ei_cb_gl; + void *ei_cbdata; /* Data to be passed into callbacks. */ +}; + extern struct obd_ops ldlm_obd_ops; extern char *ldlm_lockname[]; @@ -434,7 +457,6 @@ int ldlm_replay_locks(struct obd_import *imp); void ldlm_resource_iterate(struct ldlm_namespace *, const struct ldlm_res_id *, ldlm_iterator_t iter, void *data); - /* ldlm_flock.c */ int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data); @@ -476,6 +498,18 @@ static inline struct ldlm_lock *ldlm_handle2lock(const struct lustre_handle *h) return __ldlm_handle2lock(h, 0); } +static inline int ldlm_res_lvbo_update(struct ldlm_resource *res, + struct lustre_msg *m, int buf_idx, + int increase) +{ + if (res->lr_namespace->ns_lvbo && + res->lr_namespace->ns_lvbo->lvbo_update) { + return res->lr_namespace->ns_lvbo->lvbo_update(res, m, buf_idx, + increase); + } + return 0; +} + #define LDLM_LOCK_PUT(lock) \ do { \ /*LDLM_DEBUG((lock), "put");*/ \ @@ -564,13 +598,10 @@ int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp); int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data); int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, + struct ldlm_enqueue_info *einfo, const struct ldlm_res_id *res_id, - ldlm_type_t type, ldlm_policy_data_t *policy, - ldlm_mode_t mode, int *flags, - ldlm_blocking_callback blocking, - ldlm_completion_callback completion, - ldlm_glimpse_callback glimpse, - void *data, void *lvb, __u32 lvb_len, void *lvb_swabber, + ldlm_policy_data_t *policy, int *flags, + void *lvb, __u32 lvb_len, void *lvb_swabber, struct lustre_handle *lockh, int async); struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp, int bufcount, int *size, @@ -603,7 +634,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *, const struct ldlm_res_id *, int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, const struct ldlm_res_id *res_id, ldlm_policy_data_t *policy, - int mode, int flags, void *opaque); + ldlm_mode_t mode, int flags, void *opaque); int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *head, int count, int flags); int ldlm_cli_join_lru(struct ldlm_namespace *, diff --git a/lustre/include/lustre_mds.h b/lustre/include/lustre_mds.h index c36024df3eea53b8e87f26976aa3af1fe97901ab..b051786b482faf5cb9dbc2bdc938ef9fed443617 100644 --- a/lustre/include/lustre_mds.h +++ b/lustre/include/lustre_mds.h @@ -56,6 +56,7 @@ struct mds_update_record { __u32 ur_mode; __u32 ur_flags; struct lvfs_grp_hash_entry *ur_grp_entry; + struct ldlm_request *ur_dlm; }; /* file data for open files on MDS */ diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 9446c160dfa385f2c18bcb88d070bf353c9e2a08..d16b2ea8b683e55357254c4da70cb106d78095c2 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -132,31 +132,15 @@ struct obd_info; typedef int (*obd_enqueue_update_f)(struct obd_info *oinfo, int rc); -/* obd_enqueue parameters common for all levels (lov, osc). */ -struct obd_enqueue_info { - /* Flags used while lock handling. */ - int ei_flags; - /* Type of the lock being enqueued. */ - __u32 ei_type; - /* Mode of the lock being enqueued. */ - __u32 ei_mode; - /* Different callbacks for lock handling (blocking, completion, - glimpse */ - void *ei_cb_bl; - void *ei_cb_cp; - void *ei_cb_gl; - /* Data to be passed into callbacks. */ - void *ei_cbdata; - /* Request set for OSC async requests. */ - struct ptlrpc_request_set *ei_rqset; -}; - /* obd info for a particular level (lov, osc). */ struct obd_info { /* Lock policy. It keeps an extent which is specific for a particular * OSC. (e.g. lov_prep_enqueue_set initialises extent of the policy, * and osc_enqueue passes it into ldlm_lock_match & ldlm_cli_enqueue. */ ldlm_policy_data_t oi_policy; + /* Flags used while lock handling. The flags obtained on the enqueue + * request are set here, therefore they are request specific. */ + int oi_flags; /* Lock handle specific for every OSC lock. */ struct lustre_handle *oi_lockh; /* lsm data specific for every OSC. */ @@ -1144,7 +1128,8 @@ struct obd_ops { int niocount, struct niobuf_local *local, struct obd_trans_info *oti, int rc); int (*o_enqueue)(struct obd_export *, struct obd_info *oinfo, - struct obd_enqueue_info *einfo); + struct ldlm_enqueue_info *einfo, + struct ptlrpc_request_set *rqset); int (*o_match)(struct obd_export *, struct lov_stripe_md *, __u32 type, ldlm_policy_data_t *, __u32 mode, int *flags, void *data, struct lustre_handle *lockh); @@ -1241,10 +1226,9 @@ struct md_ops { __u64, struct ptlrpc_request **); int (*m_done_writing)(struct obd_export *, struct md_op_data *, struct obd_client_handle *); - int (*m_enqueue)(struct obd_export *, int, struct lookup_intent *, - int, struct md_op_data *, struct lustre_handle *, - void *, int, ldlm_completion_callback, - ldlm_blocking_callback, void *, int); + int (*m_enqueue)(struct obd_export *, struct ldlm_enqueue_info *, + struct lookup_intent *, struct md_op_data *, + struct lustre_handle *, void *, int, int); int (*m_getattr)(struct obd_export *, const struct lu_fid *, struct obd_capa *, obd_valid, int, struct ptlrpc_request **); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 90662fe805a47cad2f482f140220f351523afabd..385df8fe294112a6b0ffee37074b09312e5481cc 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1357,30 +1357,30 @@ static inline int obd_iocontrol(unsigned int cmd, struct obd_export *exp, static inline int obd_enqueue_rqset(struct obd_export *exp, struct obd_info *oinfo, - struct obd_enqueue_info *einfo) + struct ldlm_enqueue_info *einfo) { + struct ptlrpc_request_set *set = NULL; int rc; ENTRY; EXP_CHECK_DT_OP(exp, enqueue); EXP_COUNTER_INCREMENT(exp, enqueue); - einfo->ei_rqset = ptlrpc_prep_set(); - if (einfo->ei_rqset == NULL) + set = ptlrpc_prep_set(); + if (set == NULL) RETURN(-ENOMEM); - rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo); + rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo, set); if (rc == 0) - rc = ptlrpc_set_wait(einfo->ei_rqset); - ptlrpc_set_destroy(einfo->ei_rqset); - einfo->ei_rqset = NULL; - + rc = ptlrpc_set_wait(set); + ptlrpc_set_destroy(set); RETURN(rc); } static inline int obd_enqueue(struct obd_export *exp, struct obd_info *oinfo, - struct obd_enqueue_info *einfo) + struct ldlm_enqueue_info *einfo, + struct ptlrpc_request_set *set) { int rc; ENTRY; @@ -1388,7 +1388,7 @@ static inline int obd_enqueue(struct obd_export *exp, EXP_CHECK_DT_OP(exp, enqueue); EXP_COUNTER_INCREMENT(exp, enqueue); - rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo); + rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo, set); RETURN(rc); } @@ -1714,23 +1714,20 @@ static inline int md_done_writing(struct obd_export *exp, RETURN(rc); } -static inline int md_enqueue(struct obd_export *exp, int lock_type, - struct lookup_intent *it, int lock_mode, +static inline int md_enqueue(struct obd_export *exp, + struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, struct md_op_data *op_data, struct lustre_handle *lockh, void *lmm, int lmmsize, - ldlm_completion_callback cb_completion, - ldlm_blocking_callback cb_blocking, - void *cb_data, int extra_lock_flags) + int extra_lock_flags) { int rc; ENTRY; EXP_CHECK_MD_OP(exp, enqueue); EXP_MD_COUNTER_INCREMENT(exp, enqueue); - rc = MDP(exp->exp_obd, enqueue)(exp, lock_type, it, lock_mode, - op_data, lockh, lmm, lmmsize, - cb_completion, cb_blocking, - cb_data, extra_lock_flags); + rc = MDP(exp->exp_obd, enqueue)(exp, einfo, it, op_data, lockh, + lmm, lmmsize, extra_lock_flags); RETURN(rc); } diff --git a/lustre/include/obd_ost.h b/lustre/include/obd_ost.h index cfc9e5d5e24ced79ba07388b270f30f17d1f2bb4..23bbb044d2621abea864c71582e7244afc56b7fd 100644 --- a/lustre/include/obd_ost.h +++ b/lustre/include/obd_ost.h @@ -31,7 +31,7 @@ struct osc_async_args { struct osc_enqueue_args { struct obd_export *oa_exp; struct obd_info *oa_oi; - struct obd_enqueue_info *oa_ei; + struct ldlm_enqueue_info*oa_ei; }; #endif diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 1bfe62d61f604cae24d3acbc3f59e7e08411a7c0..f6717a115dd910d5b0a88599f984d4503b8e99e6 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -162,6 +162,8 @@ extern int obd_race_state; #define OBD_FAIL_LDLM_RECOV_CLIENTS 0x30d #define OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT 0x30e #define OBD_FAIL_LDLM_GLIMPSE 0x30f +#define OBD_FAIL_LDLM_CANCEL_RACE 0x310 +#define OBD_FAIL_LDLM_CANCEL_EVICT_RACE 0x311 #define OBD_FAIL_OSC 0x400 #define OBD_FAIL_OSC_BRW_READ_BULK 0x401 diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 7d9e1bf337d70e0358c8aa01eb4d90e242737708..82fbeeda80b4347b954fa66822b37ea6fe5429d2 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -382,7 +382,8 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_resource_unlink_lock(lock); - ldlm_extent_policy(res, lock, flags); + if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE)) + ldlm_extent_policy(res, lock, flags); ldlm_grant_lock(lock, work_list); RETURN(LDLM_ITER_CONTINUE); } diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 62b2f03cf422ab12e0afc532e291c19c0013f34e..766aebac98c3e6b8a3bc92df8b829e028cbb8e36 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1389,6 +1389,7 @@ void ldlm_cancel_callback(struct ldlm_lock *lock) LDLM_DEBUG(lock, "no blocking ast"); } } + lock->l_flags |= LDLM_FL_BL_DONE; } void ldlm_unlink_lock_skiplist(struct ldlm_lock *req) @@ -1504,6 +1505,8 @@ void ldlm_cancel_locks_for_export(struct obd_export *exp) spin_unlock(&exp->exp_ldlm_data.led_lock); LDLM_DEBUG(lock, "export %p", exp); + ldlm_res_lvbo_update(res, NULL, 0, 1); + ldlm_lock_cancel(lock); ldlm_reprocess_all(res); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index d5385edd631ab709ba4b868e5bf458026bd6083c..5b22f25dfe31fae07d561d70295c1e251de3d844 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -582,9 +582,14 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, rc = ptlrpc_queue_wait(req); OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2); } - - if (rc != 0) + if (rc != 0) { + /* If client canceled the lock but the cancel has not been + * recieved yet, we need to update lvbo to have the proper + * attributes cached. */ + if (rc == -EINVAL) + ldlm_res_lvbo_update(lock->l_resource, NULL, 0, 1); rc = ldlm_handle_ast_error(lock, req, rc, "blocking"); + } ptlrpc_req_finished(req); @@ -734,8 +739,8 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) else if (rc != 0) rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); else - rc = res->lr_namespace->ns_lvbo->lvbo_update - (res, req->rq_repmsg, REPLY_REC_OFF, 1); + rc = ldlm_res_lvbo_update(res, req->rq_repmsg, + REPLY_REC_OFF, 1); ptlrpc_req_finished(req); RETURN(rc); } @@ -996,8 +1001,8 @@ existing_lock: LDLM_DEBUG(lock, "server-side enqueue handler, sending reply" "(err=%d, rc=%d)", err, rc); + lock_res_and_lock(lock); if (rc == 0) { - lock_res_and_lock(lock); size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len; if (size[DLM_REPLY_REC_OFF] > 0) { void *lvb = lustre_msg_buf(req->rq_repmsg, @@ -1009,13 +1014,11 @@ existing_lock: memcpy(lvb, lock->l_resource->lr_lvb_data, size[DLM_REPLY_REC_OFF]); } - unlock_res_and_lock(lock); } else { - lock_res_and_lock(lock); ldlm_resource_unlink_lock(lock); ldlm_lock_destroy_nolock(lock); - unlock_res_and_lock(lock); } + unlock_res_and_lock(lock); if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK) ldlm_reprocess_all(lock->l_resource); @@ -1134,6 +1137,8 @@ int ldlm_request_cancel(struct ptlrpc_request *req, int i, count, done = 0; ENTRY; + LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks, " + "starting at %d", dlm_req->lock_count, first); count = dlm_req->lock_count ? dlm_req->lock_count : 1; if (first >= count) RETURN(0); @@ -1143,8 +1148,6 @@ int ldlm_request_cancel(struct ptlrpc_request *req, if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) RETURN(0); - LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks", - count - first); for (i = first; i < count; i++) { lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); if (!lock) { @@ -1156,32 +1159,22 @@ int ldlm_request_cancel(struct ptlrpc_request *req, res = lock->l_resource; done++; - ldlm_lock_cancel(lock); - if (ldlm_del_waiting_lock(lock)) - CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock); if (res != pres) { if (pres != NULL) { - if (pres->lr_namespace->ns_lvbo && - pres->lr_namespace->ns_lvbo->lvbo_update) { - (void)pres->lr_namespace->ns_lvbo-> - lvbo_update(pres, NULL, 0, 1); - } ldlm_reprocess_all(pres); ldlm_resource_putref(pres); } - if (res != NULL) + if (res != NULL) { ldlm_resource_getref(res); + ldlm_res_lvbo_update(res, NULL, 0, 1); + } pres = res; } + ldlm_lock_cancel(lock); LDLM_LOCK_PUT(lock); } if (pres != NULL) { - if (pres->lr_namespace->ns_lvbo && - pres->lr_namespace->ns_lvbo->lvbo_update) { - (void)pres->lr_namespace->ns_lvbo-> - lvbo_update(pres, NULL, 0, 1); - } ldlm_reprocess_all(pres); ldlm_resource_putref(pres); } @@ -1500,8 +1493,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle[0]); if (!lock) { - CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n", - dlm_req->lock_handle[0].cookie); + CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock " + "disappeared\n", dlm_req->lock_handle[0].cookie); ldlm_callback_reply(req, -EINVAL); RETURN(0); } @@ -1509,6 +1502,22 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */ lock_res_and_lock(lock); lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS); + if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) { + /* If somebody cancels locks and cache is already droped, + * we can tell the server we have no lock. Otherwise, we + * should send cancel after dropping the cache. */ + if ((lock->l_flags & LDLM_FL_CANCELING) && + (lock->l_flags & LDLM_FL_BL_DONE)) { + LDLM_DEBUG(lock, "callback on lock " + LPX64" - lock disappeared\n", + dlm_req->lock_handle[0].cookie); + unlock_res_and_lock(lock); + LDLM_LOCK_PUT(lock); + ldlm_callback_reply(req, -EINVAL); + RETURN(0); + } + lock->l_flags |= LDLM_FL_BL_AST; + } unlock_res_and_lock(lock); /* We want the ost thread to get this reply so that it can respond diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index f06737c94b80d2a4d322efdafd376a2f63e11e68..25b742b6061eb1706b10a81ed93fbb5c8ca7076c 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -487,7 +487,7 @@ cleanup: }) /* Cancel lru locks and pack them into the enqueue request. Pack there the given - * @count locks in @cancel. */ + * @count locks in @cancels. */ struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp, int bufcount, int *size, struct list_head *cancels, @@ -539,13 +539,10 @@ struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp, * request was created in ldlm_cli_enqueue and it is the async request, * pass it to the caller in @reqp. */ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, + struct ldlm_enqueue_info *einfo, const struct ldlm_res_id *res_id, - ldlm_type_t type, ldlm_policy_data_t *policy, - ldlm_mode_t mode, int *flags, - ldlm_blocking_callback blocking, - ldlm_completion_callback completion, - ldlm_glimpse_callback glimpse, - void *data, void *lvb, __u32 lvb_len, void *lvb_swabber, + ldlm_policy_data_t *policy, int *flags, + void *lvb, __u32 lvb_len, void *lvb_swabber, struct lustre_handle *lockh, int async) { struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; @@ -570,12 +567,14 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, LDLM_DEBUG(lock, "client-side enqueue START"); LASSERT(exp == lock->l_conn_export); } else { - lock = ldlm_lock_create(ns, res_id, type, mode, blocking, - completion, glimpse, data, lvb_len); + lock = ldlm_lock_create(ns, res_id, einfo->ei_type, + einfo->ei_mode, einfo->ei_cb_bl, + einfo->ei_cb_cp, einfo->ei_cb_gl, + einfo->ei_cbdata, lvb_len); if (lock == NULL) RETURN(-ENOMEM); /* for the local lock, add the reference */ - ldlm_lock_addref_internal(lock, mode); + ldlm_lock_addref_internal(lock, einfo->ei_mode); ldlm_lock2handle(lock, lockh); lock->l_lvb_swabber = lvb_swabber; if (policy != NULL) { @@ -584,8 +583,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, * descriptor (ldlm_lock2desc() below) but use an * inodebits lock internally with both bits set. */ - if (type == LDLM_IBITS && !(exp->exp_connect_flags & - OBD_CONNECT_IBITS)) + if (einfo->ei_type == LDLM_IBITS && + !(exp->exp_connect_flags & OBD_CONNECT_IBITS)) lock->l_policy_data.l_inodebits.bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE; @@ -593,7 +592,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, lock->l_policy_data = *policy; } - if (type == LDLM_EXTENT) + if (einfo->ei_type == LDLM_EXTENT) lock->l_req_extent = policy->l_extent; LDLM_DEBUG(lock, "client-side enqueue START"); } @@ -603,7 +602,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, if (reqp == NULL || *reqp == NULL) { req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); if (req == NULL) { - failed_lock_cleanup(ns, lock, lockh, mode); + failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode); LDLM_LOCK_PUT(lock); RETURN(-ENOMEM); } @@ -621,7 +620,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, lock->l_conn_export = exp; lock->l_export = NULL; - lock->l_blocking_ast = blocking; + lock->l_blocking_ast = einfo->ei_cb_bl; /* Dump lock data into the request buffer */ body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); @@ -640,7 +639,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where * [i_size, OBD_OBJECT_EOF] lock is taken. */ - LASSERT(ergo(LIBLUSTRE_CLIENT, type != LDLM_EXTENT || + LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT || policy->l_extent.end == OBD_OBJECT_EOF)); if (async) { @@ -650,9 +649,9 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, LDLM_DEBUG(lock, "sending request"); rc = ptlrpc_queue_wait(req); - err = ldlm_cli_enqueue_fini(exp, req, type, policy ? 1 : 0, - mode, flags, lvb, lvb_len, lvb_swabber, - lockh, rc); + err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0, + einfo->ei_mode, flags, lvb, lvb_len, + lvb_swabber, lockh, rc); /* If ldlm_cli_enqueue_fini did not find the lock, we need to free * one reference that we took */ @@ -772,10 +771,13 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) } /* Cancel locks locally. - * Returns: 1 if there is a need to send a cancel RPC to server. 0 otherwise. */ + * Returns: + * LDLM_FL_LOCAL_ONLY if tere is no need in a CANCEL rpc to the server; + * LDLM_FL_CANCELING otherwise; + * LDLM_FL_BL_AST if there is a need in a separate CANCEL rpc. */ static int ldlm_cli_cancel_local(struct ldlm_lock *lock) { - int rc = 0; + int rc = LDLM_FL_LOCAL_ONLY; ENTRY; if (lock->l_conn_export) { @@ -788,14 +790,15 @@ static int ldlm_cli_cancel_local(struct ldlm_lock *lock) local_only = (lock->l_flags & (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK)); ldlm_cancel_callback(lock); + rc = (lock->l_flags & LDLM_FL_BL_AST) ? + LDLM_FL_BL_AST : LDLM_FL_CANCELING; unlock_res_and_lock(lock); - if (local_only) - CDEBUG(D_INFO, "not sending request (at caller's " + if (local_only) { + CDEBUG(D_DLMTRACE, "not sending request (at caller's " "instruction)\n"); - else - rc = 1; - + rc = LDLM_FL_LOCAL_ONLY; + } ldlm_lock_cancel(lock); } else { if (lock->l_resource->lr_namespace->ns_client) { @@ -818,7 +821,7 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req, int off, { struct ldlm_request *dlm; struct ldlm_lock *lock; - int max; + int max, packed = 0; ENTRY; dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm)); @@ -837,22 +840,13 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req, int off, list_for_each_entry(lock, head, l_bl_ast) { if (!count--) break; - /* Pack the lock handle to the given request buffer. */ LASSERT(lock->l_conn_export); - /* Cannot be set on a lock in a resource granted list.*/ - LASSERT(!(lock->l_flags & - (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK))); - /* If @lock is marked CANCEL_ON_BLOCK, cancel - * will not be sent in ldlm_cli_cancel(). It - * is used for liblustre clients, no cancel on - * block requests. However, even for liblustre - * clients, when the flag is set, batched cancel - * should be sent (what if no block rpc has - * come). To not send another separated rpc in - * this case, the caller pass CANCEL_ON_BLOCK - * flag to ldlm_cli_cancel_unused_resource(). */ + /* Pack the lock handle to the given request buffer. */ + LDLM_DEBUG(lock, "packing"); dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle; + packed++; } + CDEBUG(D_DLMTRACE, "%d locks packed\n", packed); EXIT; } @@ -873,6 +867,9 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels, LASSERT(exp != NULL); LASSERT(count > 0); + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE)) + RETURN(count); + free = ldlm_req_handles_avail(exp, size, 2, 0); if (count > free) count = free; @@ -943,11 +940,13 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) /* concurrent cancels on the same handle can happen */ lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING); - if (lock == NULL) + if (lock == NULL) { + LDLM_DEBUG_NOLOCK("lock is already being destroyed\n"); RETURN(0); - + } + rc = ldlm_cli_cancel_local(lock); - if (rc <= 0) + if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) GOTO(out, rc); list_add(&lock->l_bl_ast, &head); @@ -981,15 +980,22 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, spin_lock(&ns->ns_unused_lock); count += ns->ns_nr_unused - ns->ns_max_unused; while (!list_empty(&ns->ns_unused_list)) { - struct list_head *tmp = ns->ns_unused_list.next; - lock = list_entry(tmp, struct ldlm_lock, l_lru); - if (max && added >= max) break; + list_for_each_entry(lock, &ns->ns_unused_list, l_lru) { + /* somebody is already doing CANCEL or there is a + * blocking request will send cancel. */ + if (!(lock->l_flags & LDLM_FL_CANCELING) && + !(lock->l_flags & LDLM_FL_BL_AST)) + break; + } + if (&lock->l_lru == &ns->ns_unused_list) + break; + if ((added >= count) && (!(flags & LDLM_CANCEL_AGED) || - cfs_time_before_64(cur, ns->ns_max_age + + cfs_time_before_64(cur, (__u64)ns->ns_max_age + lock->l_last_used))) break; @@ -997,10 +1003,14 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, spin_unlock(&ns->ns_unused_lock); lock_res_and_lock(lock); - if ((ldlm_lock_remove_from_lru(lock) == 0) || - (lock->l_flags & LDLM_FL_CANCELING)) { + /* Check flags again under the lock. */ + if ((lock->l_flags & LDLM_FL_CANCELING) || + (lock->l_flags & LDLM_FL_BL_AST) || + (ldlm_lock_remove_from_lru(lock) == 0)) { /* other thread is removing lock from lru or - * somebody is already doing CANCEL. */ + * somebody is already doing CANCEL or + * there is a blocking request which will send + * cancel by itseft. */ unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); spin_lock(&ns->ns_unused_lock); @@ -1040,13 +1050,24 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, list_for_each_entry_safe(lock, next, cancels, l_bl_ast) { if (left-- == 0) break; + rc = ldlm_cli_cancel_local(lock); - if (rc == 0) { + if (rc == LDLM_FL_BL_AST) { + CFS_LIST_HEAD(head); + + LDLM_DEBUG(lock, "Cancel lock separately"); + list_del_init(&lock->l_bl_ast); + list_add(&lock->l_bl_ast, &head); + ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0); + rc = LDLM_FL_LOCAL_ONLY; + } + if (rc == LDLM_FL_LOCAL_ONLY) { /* CANCEL RPC should not be sent to server. */ list_del_init(&lock->l_bl_ast); LDLM_LOCK_PUT(lock); added--; } + } RETURN(added); } @@ -1114,6 +1135,12 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, continue; } + /* If somebody is already doing CANCEL, or blocking ast came, + * skip this lock. */ + if (lock->l_flags & LDLM_FL_BL_AST || + lock->l_flags & LDLM_FL_CANCELING) + continue; + if (lockmode_compat(lock->l_granted_mode, mode)) continue; @@ -1124,10 +1151,6 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, policy->l_inodebits.bits)) continue; - /* If somebody is already doing CANCEL, skip it. */ - if (lock->l_flags & LDLM_FL_CANCELING) - continue; - /* See CBPENDING comment in ldlm_cancel_lru */ lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING | lock_flags; @@ -1142,7 +1165,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, /* Handle only @count inserted locks. */ left = count; list_for_each_entry_safe(lock, next, cancels, l_bl_ast) { - int rc = 0; + int rc = LDLM_FL_LOCAL_ONLY; if (left-- == 0) break; @@ -1151,7 +1174,16 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, else rc = ldlm_cli_cancel_local(lock); - if (rc == 0) { + if (rc == LDLM_FL_BL_AST) { + CFS_LIST_HEAD(head); + + LDLM_DEBUG(lock, "Cancel lock separately"); + list_del_init(&lock->l_bl_ast); + list_add(&lock->l_bl_ast, &head); + ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0); + rc = LDLM_FL_LOCAL_ONLY; + } + if (rc == LDLM_FL_LOCAL_ONLY) { /* CANCEL RPC should not be sent to server. */ list_del_init(&lock->l_bl_ast); LDLM_LOCK_PUT(lock); @@ -1215,7 +1247,7 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count, int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, const struct ldlm_res_id *res_id, ldlm_policy_data_t *policy, - int mode, int flags, void *opaque) + ldlm_mode_t mode, int flags, void *opaque) { struct ldlm_resource *res; CFS_LIST_HEAD(cancels); diff --git a/lustre/liblustre/dir.c b/lustre/liblustre/dir.c index 8d632eadc8f45014648c7554f42700ff2964f792..db2048835e061dbb005d9b098a4120fca74a5496 100644 --- a/lustre/liblustre/dir.c +++ b/lustre/liblustre/dir.c @@ -85,13 +85,15 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page) rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh); if (!rc) { + struct ldlm_enqueue_info einfo = {LDLM_IBITS, LCK_CR, + llu_md_blocking_ast, ldlm_completion_ast, NULL, inode}; + llu_prep_md_op_data(&op_data, inode, NULL, NULL, 0, 0, LUSTRE_OPC_ANY); - rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, &it, LCK_CR, + rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, &op_data, &lockh, NULL, 0, - ldlm_completion_ast, llu_md_blocking_ast, - inode, LDLM_FL_CANCEL_ON_BLOCK); + LDLM_FL_CANCEL_ON_BLOCK); request = (struct ptlrpc_request *)it.d.lustre.it_data; if (request) ptlrpc_req_finished(request); diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c index b716544be8e97c4e090ed32abc463c118d094409..c4946af693e573835b09758bb2aebabb672a0caa 100644 --- a/lustre/liblustre/rw.c +++ b/lustre/liblustre/rw.c @@ -269,7 +269,7 @@ int llu_glimpse_size(struct inode *inode) struct intnl_stat *st = llu_i2stat(inode); struct llu_sb_info *sbi = llu_i2sbi(inode); struct lustre_handle lockh = { 0 }; - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; int rc; ENTRY; @@ -288,7 +288,6 @@ int llu_glimpse_size(struct inode *inode) einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = LCK_PR; - einfo.ei_flags = LDLM_FL_HAS_INTENT; einfo.ei_cb_bl = llu_extent_lock_callback; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = llu_glimpse_callback; @@ -297,6 +296,7 @@ int llu_glimpse_size(struct inode *inode) oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF; oinfo.oi_lockh = &lockh; oinfo.oi_md = lli->lli_smd; + oinfo.oi_flags = LDLM_FL_HAS_INTENT; rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo); if (rc) { @@ -318,7 +318,7 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, { struct llu_sb_info *sbi = llu_i2sbi(inode); struct intnl_stat *st = llu_i2stat(inode); - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; struct ost_lvb lvb; int rc; @@ -338,7 +338,6 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = mode; - einfo.ei_flags = ast_flags; einfo.ei_cb_bl = llu_extent_lock_callback; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = llu_glimpse_callback; @@ -347,8 +346,9 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, oinfo.oi_policy = *policy; oinfo.oi_lockh = lockh; oinfo.oi_md = lsm; + oinfo.oi_flags = ast_flags; - rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo); + rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL); *policy = oinfo.oi_policy; if (rc > 0) rc = -EIO; diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 60c109b8bcc11280f66d0fad8967fc36e4881710..6c9dc66b96ed2804c11e9ee6c41f0448d1d17d98 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -1338,9 +1338,11 @@ static int llu_file_flock(struct inode *ino, fid_oid(&lli->lli_fid), fid_ver(&lli->lli_fid), LDLM_FLOCK} }; + struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL, + ldlm_flock_completion_ast, NULL, file_lock }; + struct lustre_handle lockh = {0}; ldlm_policy_data_t flock; - ldlm_mode_t mode = 0; int flags = 0; int rc; @@ -1353,13 +1355,13 @@ static int llu_file_flock(struct inode *ino, switch (file_lock->fl_type) { case F_RDLCK: - mode = LCK_PR; + einfo.ei_mode = LCK_PR; break; case F_UNLCK: - mode = LCK_NL; + einfo.ei_mode = LCK_NL; break; case F_WRLCK: - mode = LCK_PW; + einfo.ei_mode = LCK_PW; break; default: CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type); @@ -1390,7 +1392,7 @@ static int llu_file_flock(struct inode *ino, #endif #endif flags = LDLM_FL_TEST_LOCK; - file_lock->fl_type = mode; + file_lock->fl_type = einfo.ei_mode; break; default: CERROR("unknown fcntl cmd: %d\n", cmd); @@ -1399,13 +1401,11 @@ static int llu_file_flock(struct inode *ino, CDEBUG(D_DLMTRACE, "inode=%llu, pid=%u, flags=%#x, mode=%u, " "start="LPU64", end="LPU64"\n", (unsigned long long)st->st_ino, - flock.l_flock.pid, flags, mode, flock.l_flock.start, + flock.l_flock.pid, flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end); - rc = ldlm_cli_enqueue(llu_i2mdcexp(ino), NULL, &res_id, - LDLM_FLOCK, &flock, mode, &flags, NULL, - ldlm_flock_completion_ast, NULL, - file_lock, NULL, 0, NULL, &lockh, 0); + rc = ldlm_cli_enqueue(llu_i2mdcexp(ino), NULL, &einfo, &res_id, + &flock, &flags, NULL, 0, NULL, &lockh, 0); RETURN(rc); } @@ -1691,6 +1691,9 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags, struct llu_inode_info *lli2 = NULL; struct lov_stripe_md *lsm; struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags}; + struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CR, + llu_md_blocking_ast, ldlm_completion_ast, NULL, NULL }; + struct ptlrpc_request *req = NULL; struct lustre_md md; struct md_op_data data; @@ -1720,9 +1723,8 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags, llu_prep_md_op_data(&data, NULL, ino, NULL, 0, O_RDWR, LUSTRE_OPC_ANY); - rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, &oit, LCK_CR, &data, - &lockh, lum, lum_size, ldlm_completion_ast, - llu_md_blocking_ast, NULL, LDLM_FL_INTENT_ONLY); + rc = md_enqueue(sbi->ll_md_exp, &einfo, &oit, &data, + &lockh, lum, lum_size, LDLM_FL_INTENT_ONLY); if (rc) GOTO(out, rc); diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index bb6cdf452e2daed2b2f9e899487a1fdc698c1ab7..4c463d10e5b2283d95f1d4dee0e5515da86343df 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -297,6 +297,8 @@ static struct page *ll_get_dir_page(struct inode *dir, __u32 hash, int exact, rc = md_lock_match(ll_i2sbi(dir)->ll_md_exp, LDLM_FL_BLOCK_GRANTED, ll_inode2fid(dir), LDLM_IBITS, &policy, mode, &lockh); if (!rc) { + struct ldlm_enqueue_info einfo = { LDLM_IBITS, mode, + ll_md_blocking_ast, ldlm_completion_ast, NULL, dir }; struct lookup_intent it = { .it_op = IT_READDIR }; struct ptlrpc_request *request; struct md_op_data *op_data; @@ -306,10 +308,8 @@ static struct page *ll_get_dir_page(struct inode *dir, __u32 hash, int exact, if (IS_ERR(op_data)) return (void *)op_data; - rc = md_enqueue(ll_i2sbi(dir)->ll_md_exp, LDLM_IBITS, &it, - mode, op_data, &lockh, NULL, 0, - ldlm_completion_ast, ll_md_blocking_ast, dir, - 0); + rc = md_enqueue(ll_i2sbi(dir)->ll_md_exp, &einfo, &it, + op_data, &lockh, NULL, 0, 0); ll_finish_md_op_data(op_data); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 82a6108292ba7c1bc48baf2460e065a632427c26..62b0bd06c80b02670473630740c9f0f4f47e958f 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1101,7 +1101,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm, lstat_t *st) { struct lustre_handle lockh = { 0 }; - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; struct ost_lvb lvb; int rc; @@ -1110,7 +1110,6 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm, einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = LCK_PR; - einfo.ei_flags = LDLM_FL_HAS_INTENT; einfo.ei_cb_bl = ll_extent_lock_callback; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = ll_glimpse_callback; @@ -1119,6 +1118,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm, oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF; oinfo.oi_lockh = &lockh; oinfo.oi_md = lsm; + oinfo.oi_flags = LDLM_FL_HAS_INTENT; rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo); if (rc == -ENOENT) @@ -1149,7 +1149,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags) struct ll_inode_info *lli = ll_i2info(inode); struct ll_sb_info *sbi = ll_i2sbi(inode); struct lustre_handle lockh = { 0 }; - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; int rc; ENTRY; @@ -1173,7 +1173,6 @@ int ll_glimpse_size(struct inode *inode, int ast_flags) * acquired only if there were no conflicting locks. */ einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = LCK_PR; - einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT; einfo.ei_cb_bl = ll_extent_lock_callback; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = ll_glimpse_callback; @@ -1182,6 +1181,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags) oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF; oinfo.oi_lockh = &lockh; oinfo.oi_md = lli->lli_smd; + oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT; rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo); if (rc == -ENOENT) @@ -1206,7 +1206,7 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, { struct ll_sb_info *sbi = ll_i2sbi(inode); struct ost_lvb lvb; - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; int rc; ENTRY; @@ -1228,7 +1228,6 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = mode; - einfo.ei_flags = ast_flags; einfo.ei_cb_bl = ll_extent_lock_callback; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = ll_glimpse_callback; @@ -1237,8 +1236,9 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, oinfo.oi_policy = *policy; oinfo.oi_lockh = lockh; oinfo.oi_md = lsm; + oinfo.oi_flags = ast_flags; - rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo); + rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL); *policy = oinfo.oi_policy; if (rc > 0) rc = -EIO; @@ -2029,6 +2029,9 @@ static int join_file(struct inode *head_inode, struct file *head_filp, struct dentry *tail_dentry = tail_filp->f_dentry; struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = head_filp->f_flags|O_JOIN_FILE}; + struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW, + ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL }; + struct lustre_handle lockh; struct md_op_data *op_data; int rc; @@ -2046,9 +2049,8 @@ static int join_file(struct inode *head_inode, struct file *head_filp, if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); - rc = md_enqueue(ll_i2mdexp(head_inode), LDLM_IBITS, &oit, LCK_CW, - op_data, &lockh, NULL, 0, ldlm_completion_ast, - ll_md_blocking_ast, NULL, 0); + rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, + op_data, &lockh, NULL, 0, 0); ll_finish_md_op_data(op_data); if (rc < 0) @@ -2426,9 +2428,10 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) fid_oid(ll_inode2fid(inode)), fid_ver(ll_inode2fid(inode)), LDLM_FLOCK} }; + struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL, + ldlm_flock_completion_ast, NULL, file_lock }; struct lustre_handle lockh = {0}; ldlm_policy_data_t flock; - ldlm_mode_t mode = 0; int flags = 0; int rc; ENTRY; @@ -2450,7 +2453,7 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) switch (file_lock->fl_type) { case F_RDLCK: - mode = LCK_PR; + einfo.ei_mode = LCK_PR; break; case F_UNLCK: /* An unlock request may or may not have any relation to @@ -2461,10 +2464,10 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) * information that is given with a normal read or write record * lock request. To avoid creating another ldlm unlock (cancel) * message we'll treat a LCK_NL flock request as an unlock. */ - mode = LCK_NL; + einfo.ei_mode = LCK_NL; break; case F_WRLCK: - mode = LCK_PW; + einfo.ei_mode = LCK_PW; break; default: CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type); @@ -2491,7 +2494,7 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) flags = LDLM_FL_TEST_LOCK; /* Save the old mode so that if the mode in the lock changes we * can decrement the appropriate reader or writer refcount. */ - file_lock->fl_type = mode; + file_lock->fl_type = einfo.ei_mode; break; default: CERROR("unknown fcntl lock command: %d\n", cmd); @@ -2500,12 +2503,10 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, " "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid, - flags, mode, flock.l_flock.start, flock.l_flock.end); + flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end); - rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &res_id, - LDLM_FLOCK, &flock, mode, &flags, NULL, - ldlm_flock_completion_ast, NULL, file_lock, - NULL, 0, NULL, &lockh, 0); + rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id, + &flock, &flags, NULL, 0, NULL, &lockh, 0); if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0)) ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW)); #ifdef HAVE_F_OP_FLOCK diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index b48e1614a452a205243b59253be82e2f91efc54c..dbebe3897b2048758fb600046ffbc46585f0ad61 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -1432,11 +1432,9 @@ static int lmv_done_writing(struct obd_export *exp, } static int -lmv_enqueue_slaves(struct obd_export *exp, int locktype, - struct lookup_intent *it, int lockmode, - struct md_op_data *op_data, struct lustre_handle *lockh, - void *lmm, int lmmsize, ldlm_completion_callback cb_compl, - ldlm_blocking_callback cb_blocking, void *cb_data) +lmv_enqueue_slaves(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, struct md_op_data *op_data, + struct lustre_handle *lockh, void *lmm, int lmmsize) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -1463,9 +1461,8 @@ lmv_enqueue_slaves(struct obd_export *exp, int locktype, if (tgt_exp == NULL) continue; - rc = md_enqueue(tgt_exp, locktype, it, lockmode, op_data2, - lockh + i, lmm, lmmsize, cb_compl, cb_blocking, - cb_data, 0); + rc = md_enqueue(tgt_exp, einfo, it, op_data2, + lockh + i, lmm, lmmsize, 0); CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n", PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status); @@ -1491,7 +1488,7 @@ cleanup: /* drop all taken locks */ while (--i >= 0) { if (lockh[i].cookie) - ldlm_lock_decref(lockh + i, lockmode); + ldlm_lock_decref(lockh + i, einfo->ei_mode); lockh[i].cookie = 0; } } @@ -1499,11 +1496,9 @@ cleanup: } static int -lmv_enqueue_remote(struct obd_export *exp, int lock_type, - struct lookup_intent *it, int lock_mode, - struct md_op_data *op_data, struct lustre_handle *lockh, - void *lmm, int lmmsize, ldlm_completion_callback cb_compl, - ldlm_blocking_callback cb_blocking, void *cb_data, +lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, struct md_op_data *op_data, + struct lustre_handle *lockh, void *lmm, int lmmsize, int extra_lock_flags) { struct ptlrpc_request *req = it->d.lustre.it_data; @@ -1550,9 +1545,8 @@ lmv_enqueue_remote(struct obd_export *exp, int lock_type, rdata->op_fid1 = fid_copy; rdata->op_bias = MDS_CROSS_REF; - rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, rdata, - lockh, lmm, lmmsize, cb_compl, cb_blocking, - cb_data, extra_lock_flags); + rc = md_enqueue(tgt_exp, einfo, it, rdata, lockh, + lmm, lmmsize, extra_lock_flags); OBD_FREE_PTR(rdata); EXIT; out: @@ -1561,11 +1555,9 @@ out: } static int -lmv_enqueue(struct obd_export *exp, int lock_type, - struct lookup_intent *it, int lock_mode, - struct md_op_data *op_data, struct lustre_handle *lockh, - void *lmm, int lmmsize, ldlm_completion_callback cb_compl, - ldlm_blocking_callback cb_blocking, void *cb_data, +lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, struct md_op_data *op_data, + struct lustre_handle *lockh, void *lmm, int lmmsize, int extra_lock_flags) { struct obd_device *obd = exp->exp_obd; @@ -1580,9 +1572,8 @@ lmv_enqueue(struct obd_export *exp, int lock_type, RETURN(rc); if (op_data->op_mea1 && it->it_op == IT_UNLINK) { - rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode, - op_data, lockh, lmm, lmmsize, - cb_compl, cb_blocking, cb_data); + rc = lmv_enqueue_slaves(exp, einfo, it, op_data, + lockh, lmm, lmmsize); RETURN(rc); } @@ -1611,15 +1602,12 @@ lmv_enqueue(struct obd_export *exp, int lock_type, CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it), PFID(&op_data->op_fid1)); - rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, op_data, lockh, - lmm, lmmsize, cb_compl, cb_blocking, cb_data, - extra_lock_flags); + rc = md_enqueue(tgt_exp, einfo, it, op_data, lockh, + lmm, lmmsize, extra_lock_flags); if (rc == 0 && it->it_op == IT_OPEN) - rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode, - op_data, lockh, lmm, lmmsize, - cb_compl, cb_blocking, cb_data, - extra_lock_flags); + rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh, + lmm, lmmsize, extra_lock_flags); RETURN(rc); } @@ -1782,7 +1770,8 @@ static int lmv_early_cancel_stripes(struct obd_export *exp, st_fid = &obj->lo_inodes[i].li_fid; if (tgt_exp != st_exp) { rc = md_cancel_unused(st_exp, st_fid, &policy, - mode, 0, NULL); + mode, LDLM_FL_ASYNC, + NULL); if (rc) break; } else { diff --git a/lustre/lov/lov_internal.h b/lustre/lov/lov_internal.h index a8711e022acee70cdf5cc753ed3964260a8eadcc..721dfa54985130da3b6f20cca4b95c490dff7a14 100644 --- a/lustre/lov/lov_internal.h +++ b/lustre/lov/lov_internal.h @@ -36,7 +36,7 @@ struct lov_request { }; struct lov_request_set { - struct obd_enqueue_info *set_ei; + struct ldlm_enqueue_info*set_ei; struct obd_info *set_oi; atomic_t set_refcount; struct obd_export *set_exp; @@ -200,9 +200,10 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *obd_info, obd_off end, struct lov_request_set **reqset); int lov_fini_sync_set(struct lov_request_set *set); int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, - struct obd_enqueue_info *einfo, + struct ldlm_enqueue_info *einfo, struct lov_request_set **reqset); -int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc); +int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc, + struct ptlrpc_request_set *rqset); int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_stripe_md *lsm, ldlm_policy_data_t *policy, __u32 mode, diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 5e6e1bb4ad466c03b79ec62e5f4db96a7c1776af..c31295e9720146f138efc9060de724f8576f80d9 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1838,12 +1838,13 @@ static int lov_enqueue_interpret(struct ptlrpc_request_set *rqset, { struct lov_request_set *lovset = (struct lov_request_set *)data; ENTRY; - rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc); + rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc, rqset); RETURN(rc); } static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo, - struct obd_enqueue_info *einfo) + struct ldlm_enqueue_info *einfo, + struct ptlrpc_request_set *rqset) { struct lov_request_set *set; struct lov_request *req; @@ -1856,7 +1857,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo, ASSERT_LSM_MAGIC(oinfo->oi_md); /* we should never be asked to replay a lock this way. */ - LASSERT((einfo->ei_flags & LDLM_FL_REPLAY) == 0); + LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0); if (!exp || !exp->exp_obd) RETURN(-ENODEV); @@ -1870,20 +1871,20 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo, req = list_entry(pos, struct lov_request, rq_link); rc = obd_enqueue(lov->lov_tgts[req->rq_idx]->ltd_exp, - &req->rq_oi, einfo); + &req->rq_oi, einfo, rqset); if (rc != ELDLM_OK) GOTO(out, rc); } - if (einfo->ei_rqset && !list_empty(&einfo->ei_rqset->set_requests)) { + if (rqset && !list_empty(&rqset->set_requests)) { LASSERT(rc == 0); - LASSERT(einfo->ei_rqset->set_interpret == NULL); - einfo->ei_rqset->set_interpret = lov_enqueue_interpret; - einfo->ei_rqset->set_arg = (void *)set; + LASSERT(rqset->set_interpret == NULL); + rqset->set_interpret = lov_enqueue_interpret; + rqset->set_arg = (void *)set; RETURN(rc); } out: - rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc); + rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc, rqset); RETURN(rc); } diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index 88a8b5736b91bb0a42d7b9b4a0832b4133152335..b702f7ff52c68e5df7fbfa0432ff029ced3565b8 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -161,7 +161,7 @@ int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc) ldlm_lock_allow_match(lock); LDLM_LOCK_PUT(lock); } else if ((rc == ELDLM_LOCK_ABORTED) && - (set->set_ei->ei_flags & LDLM_FL_HAS_INTENT)) { + (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) { memset(lov_lockhp, 0, sizeof(*lov_lockhp)); lov_stripe_lock(set->set_oi->oi_md); loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb; @@ -192,7 +192,7 @@ int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc) /* The callback for osc_enqueue that updates lov info for every OSC request. */ static int cb_update_enqueue(struct obd_info *oinfo, int rc) { - struct obd_enqueue_info *einfo; + struct ldlm_enqueue_info *einfo; struct lov_request *lovreq; lovreq = container_of(oinfo, struct lov_request, rq_oi); @@ -237,7 +237,8 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode) RETURN(rc); } -int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc) +int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc, + struct ptlrpc_request_set *rqset) { int ret = 0; ENTRY; @@ -247,7 +248,7 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc) LASSERT(set->set_exp); /* Do enqueue_done only for sync requests and if any request * succeeded. */ - if (!set->set_ei->ei_rqset) { + if (!rqset) { if (rc) set->set_completes = 0; ret = enqueue_done(set, mode); @@ -261,7 +262,7 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc) } int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, - struct obd_enqueue_info *einfo, + struct ldlm_enqueue_info *einfo, struct lov_request_set **reqset) { struct lov_obd *lov = &exp->exp_obd->u.lov; @@ -321,6 +322,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, /* Set lov request specific parameters. */ req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i; req->rq_oi.oi_cb_up = cb_update_enqueue; + req->rq_oi.oi_flags = oinfo->oi_flags; LASSERT(req->rq_oi.oi_lockh); @@ -348,7 +350,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, *reqset = set; RETURN(0); out_set: - lov_fini_enqueue_set(set, einfo->ei_mode, rc); + lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL); RETURN(rc); } diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index 2b4122d809d26002651a6fd57fdcbc7cfc8e0cec..a23511d40fbb5d4acfe1c7cbc3e7ba5701f413f0 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -105,17 +105,11 @@ int mdc_intent_lock(struct obd_export *exp, struct lookup_intent *, int, struct ptlrpc_request **reqp, ldlm_blocking_callback cb_blocking, int extra_lock_flags); -int mdc_enqueue(struct obd_export *exp, - int lock_type, - struct lookup_intent *it, - int lock_mode, - struct md_op_data *op_data, - struct lustre_handle *lockh, - void *lmm, - int lmmlen, - ldlm_completion_callback cb_completion, - ldlm_blocking_callback cb_blocking, - void *cb_data, int extra_lock_flags); +int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, struct md_op_data *op_data, + struct lustre_handle *lockh, void *lmm, int lmmlen, + int extra_lock_flags); + int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid, struct list_head *cancels, ldlm_mode_t mode, __u64 bits); diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 8b2961d3adabce78e0b5605ff82876165236e3e3..6c28587e7defd4d93b958f6717dc9e21632648d3 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -246,17 +246,10 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req, /* We always reserve enough space in the reply packet for a stripe MD, because * we don't know in advance the file type. */ -int mdc_enqueue(struct obd_export *exp, - int lock_type, - struct lookup_intent *it, - int lock_mode, - struct md_op_data *op_data, - struct lustre_handle *lockh, - void *lmm, - int lmmsize, - ldlm_completion_callback cb_completion, - ldlm_blocking_callback cb_blocking, - void *cb_data, int extra_lock_flags) +int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, struct md_op_data *op_data, + struct lustre_handle *lockh, void *lmm, int lmmsize, + int extra_lock_flags) { struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); @@ -276,14 +269,13 @@ int mdc_enqueue(struct obd_export *exp, [DLM_LOCKREPLY_OFF] = sizeof(*lockrep), [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body), [DLM_REPLY_REC_OFF+1] = obddev->u.cli. - cl_max_mds_easize }; + cl_max_mds_easize, + 0, 0, 0 }; int flags = extra_lock_flags | LDLM_FL_HAS_INTENT; int repbufcnt = 4, rc; ENTRY; - LASSERTF(lock_type == LDLM_IBITS, "lock type %d\n", lock_type); -// LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu", -// ldlm_it2str(it->it_op), it_name, it_inode->i_ino); + LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type); if (it->it_op & IT_OPEN) { int do_join = !!(it->it_flags & O_JOIN_FILE); @@ -438,9 +430,8 @@ int mdc_enqueue(struct obd_export *exp, * rpcs in flight counter */ mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); mdc_enter_request(&obddev->u.cli); - rc = ldlm_cli_enqueue(exp, &req, &res_id, lock_type, &policy, - lock_mode, &flags, cb_blocking, cb_completion, - NULL, cb_data, NULL, 0, NULL, lockh, 0); + rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, + 0, NULL, lockh, 0); mdc_exit_request(&obddev->u.cli); mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); @@ -455,7 +446,7 @@ int mdc_enqueue(struct obd_export *exp, /* This can go when we're sure that this can never happen */ LASSERT(rc != -ENOENT); if (rc == ELDLM_LOCK_ABORTED) { - lock_mode = 0; + einfo->ei_mode = 0; memset(lockh, 0, sizeof(*lockh)); rc = 0; } else if (rc != 0) { @@ -470,10 +461,10 @@ int mdc_enqueue(struct obd_export *exp, /* If the server gave us back a different lock mode, we should * fix up our variables. */ - if (lock->l_req_mode != lock_mode) { + if (lock->l_req_mode != einfo->ei_mode) { ldlm_lock_addref(lockh, lock->l_req_mode); - ldlm_lock_decref(lockh, lock_mode); - lock_mode = lock->l_req_mode; + ldlm_lock_decref(lockh, einfo->ei_mode); + einfo->ei_mode = lock->l_req_mode; } LDLM_LOCK_PUT(lock); } @@ -485,7 +476,7 @@ int mdc_enqueue(struct obd_export *exp, it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1; it->d.lustre.it_status = (int)lockrep->lock_policy_res2; - it->d.lustre.it_lock_mode = lock_mode; + it->d.lustre.it_lock_mode = einfo->ei_mode; it->d.lustre.it_data = req; if (it->d.lustre.it_status < 0 && req->rq_replay) @@ -740,6 +731,10 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, * this and use the request from revalidate. In this case, revalidate * never dropped its reference, so the refcounts are all OK */ if (!it_disposition(it, DISP_ENQ_COMPLETE)) { + struct ldlm_enqueue_info einfo = + { LDLM_IBITS, it_to_lock_mode(it), cb_blocking, + ldlm_completion_ast, NULL, NULL }; + /* For case if upper layer did not alloc fid, do it now. */ if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) { rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data); @@ -748,11 +743,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); } } - - rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it), - op_data, &lockh, lmm, lmmsize, - ldlm_completion_ast, cb_blocking, NULL, - extra_lock_flags); + rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, + lmm, lmmsize, extra_lock_flags); if (rc < 0) RETURN(rc); memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh)); diff --git a/lustre/mds/mds_lib.c b/lustre/mds/mds_lib.c index ff2346cde92afc72418b25c6157cdcd05432e24d..83203dedac0e36438753efdcb1df7e3f289de756 100644 --- a/lustre/mds/mds_lib.c +++ b/lustre/mds/mds_lib.c @@ -122,21 +122,25 @@ static int mds_setattr_unpack(struct ptlrpc_request *req, int offset, r->ur_flags = rec->sa_attr_flags; LASSERT_REQSWAB (req, offset + 1); - if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 1) { + r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 1); + if (r->ur_eadatalen) { r->ur_eadata = lustre_msg_buf(req->rq_reqmsg, offset + 1, 0); if (r->ur_eadata == NULL) RETURN(-EFAULT); - r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 1); } - - if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 2) { + r->ur_cookielen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); + if (r->ur_cookielen) { r->ur_logcookies = lustre_msg_buf(req->rq_reqmsg, offset + 2,0); if (r->ur_eadata == NULL) RETURN (-EFAULT); - - r->ur_cookielen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); } - + if (lustre_msg_buflen(req->rq_reqmsg, offset + 3)) { + r->ur_dlm = lustre_swab_reqbuf(req, offset + 3, + sizeof(*r->ur_dlm), + lustre_swab_ldlm_request); + if (r->ur_dlm == NULL) + RETURN (-EFAULT); + } RETURN(0); } @@ -172,7 +176,8 @@ static int mds_create_unpack(struct ptlrpc_request *req, int offset, r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1); LASSERT_REQSWAB(req, offset + 2); - if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 2) { + r->ur_tgtlen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); + if (r->ur_tgtlen) { /* NB for now, we only seem to pass NULL terminated symlink * target strings here. If this ever changes, we'll have * to stop checking for a buffer filled completely with a @@ -183,7 +188,13 @@ static int mds_create_unpack(struct ptlrpc_request *req, int offset, r->ur_tgt = lustre_msg_string(req->rq_reqmsg, offset + 2, 0); if (r->ur_tgt == NULL) RETURN (-EFAULT); - r->ur_tgtlen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); + } + if (lustre_msg_buflen(req->rq_reqmsg, offset + 3)) { + r->ur_dlm = lustre_swab_reqbuf(req, offset + 3, + sizeof(*r->ur_dlm), + lustre_swab_ldlm_request); + if (r->ur_dlm == NULL) + RETURN (-EFAULT); } RETURN(0); } @@ -215,6 +226,13 @@ static int mds_link_unpack(struct ptlrpc_request *req, int offset, if (r->ur_name == NULL) RETURN (-EFAULT); r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1); + if (lustre_msg_buflen(req->rq_reqmsg, offset + 2)) { + r->ur_dlm = lustre_swab_reqbuf(req, offset + 2, + sizeof(*r->ur_dlm), + lustre_swab_ldlm_request); + if (r->ur_dlm == NULL) + RETURN (-EFAULT); + } RETURN(0); } @@ -246,6 +264,14 @@ static int mds_unlink_unpack(struct ptlrpc_request *req, int offset, if (r->ur_name == NULL) RETURN(-EFAULT); r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1); + + if (lustre_msg_buflen(req->rq_reqmsg, offset + 2)) { + r->ur_dlm = lustre_swab_reqbuf(req, offset + 2, + sizeof(*r->ur_dlm), + lustre_swab_ldlm_request); + if (r->ur_dlm == NULL) + RETURN (-EFAULT); + } RETURN(0); } @@ -282,6 +308,13 @@ static int mds_rename_unpack(struct ptlrpc_request *req, int offset, if (r->ur_tgt == NULL) RETURN(-EFAULT); r->ur_tgtlen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); + if (lustre_msg_buflen(req->rq_reqmsg, offset + 3)) { + r->ur_dlm = lustre_swab_reqbuf(req, offset + 3, + sizeof(*r->ur_dlm), + lustre_swab_ldlm_request); + if (r->ur_dlm == NULL) + RETURN (-EFAULT); + } RETURN(0); } @@ -317,11 +350,11 @@ static int mds_open_unpack(struct ptlrpc_request *req, int offset, r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1); LASSERT_REQSWAB(req, offset + 2); - if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 2) { + r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); + if (r->ur_eadatalen) { r->ur_eadata = lustre_msg_buf(req->rq_reqmsg, offset + 2, 0); if (r->ur_eadata == NULL) RETURN (-EFAULT); - r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 2); } RETURN(0); } diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index d30103c96a24f7b5cd66ddf8beeec9eb600487de..6e2688bda80a4f23b211abb18ecd0c72453c9f3f 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -523,6 +523,9 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req)); + if (rec->ur_dlm) + ldlm_request_cancel(req, rec->ur_dlm, 0); + if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN || (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)) { de = mds_fid2dentry(mds, rec->ur_fid1, NULL); @@ -791,6 +794,9 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE)) GOTO(cleanup, rc = -ESTALE); + if (rec->ur_dlm) + ldlm_request_cancel(req, rec->ur_dlm, 0); + dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX, &lockh, MDS_INODELOCK_UPDATE); if (IS_ERR(dparent)) { @@ -1577,6 +1583,9 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) GOTO(cleanup, rc = -ENOENT); + if (rec->ur_dlm) + ldlm_request_cancel(req, rec->ur_dlm, 0); + rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, &parent_lockh, &dparent, LCK_EX, MDS_INODELOCK_UPDATE, @@ -1814,6 +1823,9 @@ static int mds_reint_link(struct mds_update_record *rec, int offset, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) GOTO(cleanup, rc = -ENOENT); + if (rec->ur_dlm) + ldlm_request_cancel(req, rec->ur_dlm, 0); + /* Step 1: Lookup the source inode and target directory by FID */ de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL); if (IS_ERR(de_src)) @@ -2155,6 +2167,9 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, MDS_CHECK_RESENT(req, mds_reconstruct_generic(req)); + if (rec->ur_dlm) + ldlm_request_cancel(req, rec->ur_dlm, 0); + rc = mds_get_parents_children_locked(obd, mds, rec->ur_fid1, &de_srcdir, rec->ur_fid2, &de_tgtdir, LCK_EX, rec->ur_name, rec->ur_namelen, diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 34466b8dd47009076acd51ef4b6bb2576e577f85..676df34396ffa66395b36a8bb387ccf74325f58c 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -741,16 +741,16 @@ static int mdt_rename_lock(struct mdt_thread_info *info, ldlm_completion_ast, NULL, NULL, 0, NULL, lh); } else { + struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_EX, + ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL }; int flags = 0; /* * This is the case mdt0 is remote node, issue DLM lock like * other clients. */ - rc = ldlm_cli_enqueue(ls->ls_control_exp, NULL, res_id, - LDLM_IBITS, policy, LCK_EX, &flags, - ldlm_blocking_ast, ldlm_completion_ast, - NULL, NULL, NULL, 0, NULL, lh, 0); + rc = ldlm_cli_enqueue(ls->ls_control_exp, NULL, &einfo, res_id, + policy, &flags, NULL, 0, NULL, lh, 0); } RETURN(rc); diff --git a/lustre/mgc/mgc_request.c b/lustre/mgc/mgc_request.c index 5de0f959c4ab316e7e6980df6454906bb98ad87e..e25539ac0987df180bd5e8699a02b3a44a62cc12 100644 --- a/lustre/mgc/mgc_request.c +++ b/lustre/mgc/mgc_request.c @@ -596,6 +596,9 @@ static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, struct lustre_handle *lockh) { struct config_llog_data *cld = (struct config_llog_data *)data; + struct ldlm_enqueue_info einfo = { type, mode, mgc_blocking_ast, + ldlm_completion_ast, NULL, data}; + int rc; ENTRY; @@ -609,10 +612,8 @@ static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, /* We need a callback for every lockholder, so don't try to ldlm_lock_match (see rev 1.1.2.11.2.47) */ - rc = ldlm_cli_enqueue(exp, NULL, &cld->cld_resid, - type, NULL, mode, flags, - mgc_blocking_ast, ldlm_completion_ast, NULL, - data, NULL, 0, NULL, lockh, 0); + rc = ldlm_cli_enqueue(exp, NULL, &einfo, &cld->cld_resid, + NULL, flags, NULL, 0, NULL, lockh, 0); /* A failed enqueue should still call the mgc_blocking_ast, where it will be requeued if needed ("grant failed"). */ diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 1dcb3e06d9265df1254d7317f00d2500f9d3ca41..ef16000680fd1e8956295912d84ffa2b1392a39f 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -1102,7 +1102,7 @@ echo_client_enqueue(struct obd_export *exp, struct obdo *oa, struct obd_device *obd = exp->exp_obd; struct echo_client_obd *ec = &obd->u.echo_client; struct lustre_handle *ulh = obdo_handle (oa); - struct obd_enqueue_info einfo = { 0 }; + struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; struct ec_object *eco; struct ec_lock *ecl; @@ -1140,7 +1140,7 @@ echo_client_enqueue(struct obd_export *exp, struct obdo *oa, oinfo.oi_policy = ecl->ecl_policy; oinfo.oi_lockh = &ecl->ecl_lock_handle; oinfo.oi_md = eco->eco_lsm; - rc = obd_enqueue(ec->ec_exp, &oinfo, &einfo); + rc = obd_enqueue(ec->ec_exp, &oinfo, &einfo, NULL); if (rc != 0) goto failed_1; diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index b53ea58a70853ed53680dd37fd1482fc1fc6214c..a3aa191e63385d9e7c3d8e70cb8ba9c879b64c52 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1706,8 +1706,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * * Of course, this will all disappear when we switch to * taking liblustre locks on the OST. */ - if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update) - ns->ns_lvbo->lvbo_update(res, NULL, 0, 1); + ldlm_res_lvbo_update(res, NULL, 0, 1); } RETURN(ELDLM_LOCK_ABORTED); } @@ -1733,8 +1732,8 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * XXX nikita: situation when ldlm_server_glimpse_ast() failed before * sending ast is not handled. This can result in lost client writes. */ - if (rc != 0 && ns->ns_lvbo && ns->ns_lvbo->lvbo_update) - ns->ns_lvbo->lvbo_update(res, NULL, 0, 1); + if (rc != 0) + ldlm_res_lvbo_update(res, NULL, 0, 1); lock_res(res); *reply_lvb = *res_lvb; @@ -3007,7 +3006,6 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, { struct ldlm_res_id res_id = { .name = { oinfo->oi_oa->o_id, 0, oinfo->oi_oa->o_gr, 0 } }; - struct ldlm_valblock_ops *ns_lvbo; struct filter_mod_data *fmd; struct lvfs_run_ctxt saved; struct filter_obd *filter; @@ -3047,9 +3045,7 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, &res_id, LDLM_EXTENT, 0); if (res != NULL) { - ns_lvbo = res->lr_namespace->ns_lvbo; - if (ns_lvbo && ns_lvbo->lvbo_update) - rc = ns_lvbo->lvbo_update(res, NULL, 0, 0); + rc = ldlm_res_lvbo_update(res, NULL, 0, 0); ldlm_resource_putref(res); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 8abd48d0e6b38606789352f9a38ae33623d52658..6ca5e6f34f446dd04ca13dfa0ffa9d53368ea4a3 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2779,7 +2779,7 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo, static int osc_enqueue_interpret(struct ptlrpc_request *req, struct osc_enqueue_args *aa, int rc) { - int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT; + int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT; struct lov_stripe_md *lsm = aa->oa_oi->oi_md; struct ldlm_lock *lock; @@ -2790,7 +2790,7 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req, /* Complete obtaining the lock procedure. */ rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, aa->oa_ei->ei_mode, - &aa->oa_ei->ei_flags, + &aa->oa_oi->oi_flags, &lsm->lsm_oinfo[0]->loi_lvb, sizeof(lsm->lsm_oinfo[0]->loi_lvb), lustre_swab_ost_lvb, @@ -2817,13 +2817,14 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req, * is excluded from the cluster -- such scenarious make the life difficult, so * release locks just after they are obtained. */ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, - struct obd_enqueue_info *einfo) + struct ldlm_enqueue_info *einfo, + struct ptlrpc_request_set *rqset) { struct ldlm_res_id res_id = { .name = {0} }; struct obd_device *obd = exp->exp_obd; struct ldlm_reply *rep; struct ptlrpc_request *req = NULL; - int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT; + int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT; int rc; ENTRY; @@ -2841,12 +2842,12 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, /* Next, search for already existing extent locks that will cover us */ rc = ldlm_lock_match(obd->obd_namespace, - einfo->ei_flags | LDLM_FL_LVB_READY, &res_id, + oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id, einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode, oinfo->oi_lockh); if (rc == 1) { osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata, - einfo->ei_flags); + oinfo->oi_flags); if (intent) { /* I would like to be able to ASSERT here that rss <= * kms, but I can't, for reasons which are explained in @@ -2857,7 +2858,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, oinfo->oi_cb_up(oinfo, ELDLM_OK); /* For async requests, decref the lock. */ - if (einfo->ei_rqset) + if (rqset) ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode); RETURN(ELDLM_OK); @@ -2877,7 +2878,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, if (einfo->ei_mode == LCK_PR) { rc = ldlm_lock_match(obd->obd_namespace, - einfo->ei_flags | LDLM_FL_LVB_READY, + oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id, einfo->ei_type, &oinfo->oi_policy, LCK_PW, oinfo->oi_lockh); if (rc == 1) { @@ -2885,11 +2886,11 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, * be more elegant than adding another parameter to * lock_match. I want a second opinion. */ /* addref the lock only if not async requests. */ - if (!einfo->ei_rqset) + if (!rqset) ldlm_lock_addref(oinfo->oi_lockh, LCK_PR); osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata, - einfo->ei_flags); + oinfo->oi_flags); oinfo->oi_cb_up(oinfo, ELDLM_OK); ldlm_lock_decref(oinfo->oi_lockh, LCK_PW); RETURN(ELDLM_OK); @@ -2914,18 +2915,15 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, } /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */ - einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED; + oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED; - rc = ldlm_cli_enqueue(exp, &req, &res_id, einfo->ei_type, - &oinfo->oi_policy, einfo->ei_mode, - &einfo->ei_flags, einfo->ei_cb_bl, - einfo->ei_cb_cp, einfo->ei_cb_gl, - einfo->ei_cbdata, + rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, + &oinfo->oi_policy, &oinfo->oi_flags, &oinfo->oi_md->lsm_oinfo[0]->loi_lvb, sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb), lustre_swab_ost_lvb, oinfo->oi_lockh, - einfo->ei_rqset ? 1 : 0); - if (einfo->ei_rqset) { + rqset ? 1 : 0); + if (rqset) { if (!rc) { struct osc_enqueue_args *aa; CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); @@ -2935,7 +2933,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, aa->oa_exp = exp; req->rq_interpret_reply = osc_enqueue_interpret; - ptlrpc_set_add_req(einfo->ei_rqset, req); + ptlrpc_set_add_req(rqset, req); } else if (intent) { ptlrpc_req_finished(req); } diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 3d3e732fdd3d8759ea1971a2e0ac0292ce92d949..5e6a6cc7dd74b62b336eebae2ecf2c1bb82cf965 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -467,9 +467,10 @@ void lustre_assert_wire_constants(void) CLASSERT(OBD_CONNECT_QUOTA64 == 0x00080000ULL); CLASSERT(OBD_CONNECT_MDS_CAPA == 0x00100000ULL); CLASSERT(OBD_CONNECT_OSS_CAPA == 0x00200000ULL); - CLASSERT(OBD_CONNECT_MDS_MDS == 0x00400000ULL); + CLASSERT(OBD_CONNECT_CANCELSET == 0x00400000ULL); CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL); - CLASSERT(OBD_CONNECT_CANCELSET == 0x01000000ULL); + CLASSERT(OBD_CONNECT_AT == 0x01000000ULL); + CLASSERT(OBD_CONNECT_MDS_MDS == 0x02000000ULL); /* Checks for struct obdo */ LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n", diff --git a/lustre/tests/recovery-small.sh b/lustre/tests/recovery-small.sh index ee6ebd769143815e48f0a88b502af35413881d61..c5a7a5500cf467e938b9f98e75292d198648388a 100755 --- a/lustre/tests/recovery-small.sh +++ b/lustre/tests/recovery-small.sh @@ -854,6 +854,19 @@ test_58() { # bug 11546 } run_test 58 "Eviction in the middle of open RPC reply processing" +test_59() { # bug 10589 + zconf_mount `hostname` $MOUNT2 || error "Failed to mount $MOUNT2" +#define OBD_FAIL_LDLM_CANCEL_EVICT_RACE 0x311 + sysctl -w lustre.fail_loc=0x311 + writes=`dd if=/dev/zero of=$DIR2/$tfile count=1 2>&1 | awk 'BEGIN { FS="+" } /out/ {print $1}'` + sysctl -w lustre.fail_loc=0 + sync + zconf_umount `hostname` $DIR2 -f + reads=`dd if=$DIR/$tfile of=/dev/null 2>&1 | awk 'BEGIN { FS="+" } /in/ {print $1}'` + [ $reads -eq $writes ] || error "read" $reads "blocks, must be" $writes +} +run_test 59 "Read cancel race on client eviction" + equals_msg `basename $0`: test complete, cleaning up check_and_cleanup_lustre [ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG || true diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 05bfe233c50dfff0c767457bb31b9bd60a1bb6c2..a1d5103e08b5d2c3efebbb4436e72b0209d5f628 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -4019,7 +4019,7 @@ test_119b() # bug 11737 } run_test 119b "Sparse directIO read must return actual read amount" -test_119a() { +test_120a() { mkdir $DIR/$tdir cancel_lru_locks mdc stat $DIR/$tdir > /dev/null @@ -4031,9 +4031,9 @@ test_119a() { [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." } -run_test 119a "Early Lock Cancel: mkdir test" +run_test 120a "Early Lock Cancel: mkdir test" -test_119b() { +test_120b() { mkdir $DIR/$tdir cancel_lru_locks mdc stat $DIR/$tdir > /dev/null @@ -4045,9 +4045,9 @@ test_119b() { [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." } -run_test 119b "Early Lock Cancel: create test" +run_test 120b "Early Lock Cancel: create test" -test_119c() { +test_120c() { mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2 touch $DIR/$tdir/d1/f1 cancel_lru_locks mdc @@ -4060,9 +4060,9 @@ test_119c() { [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." } -run_test 119c "Early Lock Cancel: link test" +run_test 120c "Early Lock Cancel: link test" -test_119d() { +test_120d() { touch $DIR/$tdir cancel_lru_locks mdc stat $DIR/$tdir > /dev/null @@ -4074,9 +4074,9 @@ test_119d() { [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." } -run_test 119d "Early Lock Cancel: setattr test" +run_test 120d "Early Lock Cancel: setattr test" -test_119e() { +test_120e() { mkdir $DIR/$tdir dd if=/dev/zero of=$DIR/$tdir/f1 count=1 cancel_lru_locks mdc @@ -4091,9 +4091,9 @@ test_119e() { [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." } -run_test 119e "Early Lock Cancel: unlink test" +run_test 120e "Early Lock Cancel: unlink test" -test_119f() { +test_120f() { mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2 dd if=/dev/zero of=$DIR/$tdir/d1/f1 count=1 dd if=/dev/zero of=$DIR/$tdir/d2/f2 count=1 @@ -4110,9 +4110,9 @@ test_119f() { [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured." [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." } -run_test 119f "Early Lock Cancel: rename test" +run_test 120f "Early Lock Cancel: rename test" -test_119g() { +test_120g() { count=10000 echo create $count files mkdir $DIR/$tdir @@ -4139,7 +4139,20 @@ test_119g() { sleep 2 # wait for commitment of removal } -run_test 119g "Early Lock Cancel: performance test" +run_test 120g "Early Lock Cancel: performance test" + +test_121() { #bug #10589 + rm -rf $DIR/$tfile + writes=`dd if=/dev/zero of=$DIR/$tfile count=1 2>&1 | awk 'BEGIN { FS="+" } /out/ {print $1}'` +#define OBD_FAIL_LDLM_CANCEL_RACE 0x310 + sysctl -w lustre.fail_loc=0x310 + cancel_lru_locks osc > /dev/null + reads=`dd if=$DIR/$tfile of=/dev/null 2>&1 | awk 'BEGIN { FS="+" } /in/ {print $1}'` + sysctl -w lustre.fail_loc=0 + [ $reads -eq $writes ] || error "read" $reads "blocks, must be" $writes +} +run_test 121 "read cancel race =========" + TMPDIR=$OLDTMPDIR TMP=$OLDTMP diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 4fd177b323f651e356376872b96cac8cdd255c35..ff3cde161434d25b1028ebed8e1f2308571ae59f 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -180,6 +180,7 @@ static void check_obd_connect_data(void) CHECK_CDEFINE(OBD_CONNECT_OSS_CAPA); CHECK_CDEFINE(OBD_CONNECT_MDS_MDS); CHECK_CDEFINE(OBD_CONNECT_SOM); + CHECK_CDEFINE(OBD_CONNECT_AT); CHECK_CDEFINE(OBD_CONNECT_CANCELSET); } diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 66065c3ada528d02efa1399fa2820bde1dbe8f4e..d7c257432302f2a1b6723166058944712807244a 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -483,9 +483,10 @@ void lustre_assert_wire_constants(void) CLASSERT(OBD_CONNECT_QUOTA64 == 0x00080000ULL); CLASSERT(OBD_CONNECT_MDS_CAPA == 0x00100000ULL); CLASSERT(OBD_CONNECT_OSS_CAPA == 0x00200000ULL); - CLASSERT(OBD_CONNECT_MDS_MDS == 0x00400000ULL); + CLASSERT(OBD_CONNECT_CANCELSET == 0x00400000ULL); CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL); - CLASSERT(OBD_CONNECT_CANCELSET == 0x01000000ULL); + CLASSERT(OBD_CONNECT_AT == 0x01000000ULL); + CLASSERT(OBD_CONNECT_MDS_MDS == 0x02000000ULL); /* Checks for struct obdo */ LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",