diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 0463a3f9d0a3ff4ee64933698f09b80c5f5bac75..791f7bd547b87a403dceb04dcf4369352d206c3a 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -637,9 +637,10 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_allow_match(struct ldlm_lock *lock); -int ldlm_lock_match(struct ldlm_namespace *ns, int flags, struct ldlm_res_id *, - ldlm_type_t type, ldlm_policy_data_t *, ldlm_mode_t mode, - struct lustre_handle *); +ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, + struct ldlm_res_id *, ldlm_type_t type, + ldlm_policy_data_t *, ldlm_mode_t mode, + struct lustre_handle *); struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, int *flags); void ldlm_lock_cancel(struct ldlm_lock *lock); diff --git a/lustre/include/lustre_handles.h b/lustre/include/lustre_handles.h index ac56c27fa70adaab726740cc9a026e198ec06961..918617391caae201679586f6518c5976cb4e7ad2 100644 --- a/lustre/include/lustre_handles.h +++ b/lustre/include/lustre_handles.h @@ -32,10 +32,12 @@ struct portals_handle { /* newly added fields to handle the RCU issue. -jxiong */ spinlock_t h_lock; - unsigned int h_size; void *h_ptr; void (*h_free_cb)(void *, size_t); struct rcu_head h_rcu; + unsigned int h_size; + __u8 h_in:1; + __u8 h_unused[3]; }; #define RCU2HANDLE(rcu) container_of(rcu, struct portals_handle, h_rcu) diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 4e62e68e9a7892c01551281168314fb4b42a482d..1a51e03d52d1b3e24425796238a17736bfc78a06 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -901,7 +901,8 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list) /* returns a referenced lock or NULL. See the flag descriptions below, in the * comment above ldlm_lock_match */ -static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, +static struct ldlm_lock *search_queue(struct list_head *queue, + ldlm_mode_t *mode, ldlm_policy_data_t *policy, struct ldlm_lock *old_lock, int flags) { @@ -909,6 +910,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, struct list_head *tmp; list_for_each(tmp, queue) { + ldlm_mode_t match; + lock = list_entry(tmp, struct ldlm_lock, l_res_link); if (lock == old_lock) @@ -927,16 +930,17 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, lock->l_readers == 0 && lock->l_writers == 0) continue; - if (!(lock->l_req_mode & mode)) + if (!(lock->l_req_mode & *mode)) continue; + match = lock->l_req_mode; if (lock->l_resource->lr_type == LDLM_EXTENT && (lock->l_policy_data.l_extent.start > policy->l_extent.start || lock->l_policy_data.l_extent.end < policy->l_extent.end)) continue; - if (unlikely(mode == LCK_GROUP) && + if (unlikely(match == LCK_GROUP) && lock->l_resource->lr_type == LDLM_EXTENT && lock->l_policy_data.l_extent.gid != policy->l_extent.gid) continue; @@ -960,8 +964,9 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, LDLM_LOCK_GET(lock); ldlm_lock_touch_in_lru(lock); } else { - ldlm_lock_addref_internal_nolock(lock, mode); + ldlm_lock_addref_internal_nolock(lock, match); } + *mode = match; return lock; } @@ -996,10 +1001,10 @@ void ldlm_lock_allow_match(struct ldlm_lock *lock) * Returns 1 if it finds an already-existing lock that is compatible; in this * case, lockh is filled in with a addref()ed lock */ -int ldlm_lock_match(struct ldlm_namespace *ns, int flags, - struct ldlm_res_id *res_id, ldlm_type_t type, - ldlm_policy_data_t *policy, ldlm_mode_t mode, - struct lustre_handle *lockh) +ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, + struct ldlm_res_id *res_id, ldlm_type_t type, + ldlm_policy_data_t *policy, ldlm_mode_t mode, + struct lustre_handle *lockh) { struct ldlm_resource *res; struct ldlm_lock *lock, *old_lock = NULL; @@ -1024,15 +1029,15 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, lock_res(res); - lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags); + lock = search_queue(&res->lr_granted, &mode, policy, old_lock, flags); if (lock != NULL) GOTO(out, rc = 1); if (flags & LDLM_FL_BLOCK_GRANTED) GOTO(out, rc = 0); - lock = search_queue(&res->lr_converting, mode, policy, old_lock, flags); + lock = search_queue(&res->lr_converting, &mode, policy, old_lock, flags); if (lock != NULL) GOTO(out, rc = 1); - lock = search_queue(&res->lr_waiting, mode, policy, old_lock, flags); + lock = search_queue(&res->lr_waiting, &mode, policy, old_lock, flags); if (lock != NULL) GOTO(out, rc = 1); @@ -1088,7 +1093,7 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, if (flags & LDLM_FL_TEST_LOCK && rc) LDLM_LOCK_PUT(lock); - return rc; + return rc ? mode : 0; } /* Returns a referenced lock */ diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 5425d6bff18bfc7c9550ee45884c06205ec9a2a4..8a5966698c374b6904733d7f2f83a8d2b91172b6 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -399,7 +399,7 @@ static int llu_have_md_lock(struct inode *inode, __u64 lockpart) flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK; if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS, - &policy, LCK_PW | LCK_PR, &lockh)) { + &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) { RETURN(1); } RETURN(0); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index a0ea289393e6be419afd9edbece9f845975c56f7..595daad66dcdbf5597cfffe29775707534afd942 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -2490,7 +2490,7 @@ int ll_have_md_lock(struct inode *inode, __u64 bits) flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK; if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS, - &policy, LCK_CR|LCK_CW|LCK_PR, &lockh)) { + &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) { RETURN(1); } diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index d13f173b744e20cbfb603cad462c27e1e68b3207..9cacc16ec04d9b0495d5a747dbd361bfc8a24900 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -1271,7 +1271,7 @@ static int ll_setattr_do_truncate(struct inode *inode, loff_t new_size) ast_flags = LDLM_FL_BLOCK_GRANTED; rc = obd_match(sbi->ll_osc_exp, lsm, LDLM_EXTENT, &policy, LCK_PW, &ast_flags, inode, &lockh); - if (rc == 1) { + if (rc > 0) { local_lock = 2; rc = 0; } else if (rc == 0) { diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 0e6c873e9937221b5c515ff7b6ac03b004d11516..ece42efe13a3c219167c50fd216a5ebb620febf8 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1842,6 +1842,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo, struct ldlm_enqueue_info *einfo, struct ptlrpc_request_set *rqset) { + ldlm_mode_t mode = einfo->ei_mode; struct lov_request_set *set; struct lov_request *req; struct list_head *pos; @@ -1851,6 +1852,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo, LASSERT(oinfo); ASSERT_LSM_MAGIC(oinfo->oi_md); + LASSERT(mode == (mode & -mode)); /* we should never be asked to replay a lock this way. */ LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0); @@ -1880,7 +1882,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo, RETURN(rc); } out: - rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc, rqset); + rc = lov_fini_enqueue_set(set, mode, rc, rqset); RETURN(rc); } @@ -1898,6 +1900,7 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm, ENTRY; ASSERT_LSM_MAGIC(lsm); + LASSERT((*flags & LDLM_FL_TEST_LOCK) || mode == (mode & -mode)); if (!exp || !exp->exp_obd) RETURN(-ENODEV); @@ -1920,7 +1923,7 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm, req->rq_oi.oi_md, type, &sub_policy, mode, &lov_flags, data, lov_lockhp); rc = lov_update_match_set(set, req, rc); - if (rc != 1) + if (rc <= 0) break; } lov_fini_match_set(set, mode, *flags); diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index a59ddd4311f5ba2ab79625340a092be91cb94e2d..858894c93d0ce7501bd7cb0c65a14d3c596ea063 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -360,7 +360,7 @@ int lov_update_match_set(struct lov_request_set *set, struct lov_request *req, int ret = rc; ENTRY; - if (rc == 1) + if (rc > 0) ret = 0; else if (rc == 0) ret = 1; diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 6f2da48b0506f8cff064c6f2d861f776b5b299fd..6f3813ed72d7a742ff08d12a6690c714c0eea3a5 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -595,46 +595,33 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, } EXPORT_SYMBOL(mdc_enqueue); -int mdc_revalidate_lock(struct obd_export *exp, - struct lookup_intent *it, +int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, struct ll_fid *fid) { /* We could just return 1 immediately, but since we should only * be called in revalidate_it if we already have a lock, let's * verify that. */ struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}}; - struct lustre_handle lockh; - ldlm_policy_data_t policy; - int mode = LCK_CR; - int rc; - - /* As not all attributes are kept under update lock, e.g. - owner/group/acls are under lookup lock, we need both - ibits for GETATTR. */ - policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? - MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP : - MDS_INODELOCK_LOOKUP; - - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED, - &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh); - if (!rc) { - mode = LCK_CW; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS, - &policy, LCK_CW, &lockh); - } - if (!rc) { - mode = LCK_PR; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS, - &policy, LCK_PR, &lockh); - } - if (rc) { + struct lustre_handle lockh; + ldlm_policy_data_t policy; + ldlm_mode_t mode; + + /* As not all attributes are kept under update lock, e.g. + owner/group/acls are under lookup lock, we need both + ibits for GETATTR. */ + policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? + MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP : + MDS_INODELOCK_LOOKUP; + + mode = ldlm_lock_match(exp->exp_obd->obd_namespace, + LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS, + &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh); + if (mode) { memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh)); - it->d.lustre.it_lock_mode = mode; - } + it->d.lustre.it_lock_mode = mode; + } - return rc; + return !!mode; } EXPORT_SYMBOL(mdc_revalidate_lock); diff --git a/lustre/obdclass/lustre_handles.c b/lustre/obdclass/lustre_handles.c index 89388f647797cd0551c56cf1553c9d999057c1eb..05b10c4bc94c74e90d84738dd112e5d063d81750 100644 --- a/lustre/obdclass/lustre_handles.c +++ b/lustre/obdclass/lustre_handles.c @@ -102,6 +102,7 @@ void class_handle_hash(struct portals_handle *h, portals_handle_addref_cb cb) bucket = &handle_hash[h->h_cookie & HANDLE_HASH_MASK]; spin_lock(&bucket->lock); list_add_rcu(&h->h_link, &bucket->head); + h->h_in = 1; spin_unlock(&bucket->lock); CDEBUG(D_INFO, "added object %p with handle "LPX64" to hash\n", @@ -121,11 +122,11 @@ static void class_handle_unhash_nolock(struct portals_handle *h) h, h->h_cookie); spin_lock(&h->h_lock); - if (h->h_cookie == 0) { + if (h->h_in == 0) { spin_unlock(&h->h_lock); return; } - h->h_cookie = 0; + h->h_in = 0; spin_unlock(&h->h_lock); list_del_rcu(&h->h_link); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index f145d8940a757321b612d99174cac0062398fe6d..04df048063bd05e25a991d897cdb45183a326adc 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2713,8 +2713,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data, return; } lock_res_and_lock(lock); -#ifdef __KERNEL__ -#ifdef __LINUX__ +#if defined (__KERNEL__) && defined (__LINUX__) /* Liang XXX: Darwin and Winnt checking should be added */ if (lock->l_ast_data && lock->l_ast_data != data) { struct inode *new_inode = data; @@ -2728,7 +2727,6 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data, old_inode->i_state, new_inode, new_inode->i_ino, new_inode->i_generation); } -#endif #endif lock->l_ast_data = data; lock->l_flags |= (flags & LDLM_FL_NO_LRU); @@ -2827,6 +2825,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, struct ldlm_reply *rep; struct ptlrpc_request *req = NULL; int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT; + ldlm_mode_t mode; int rc; ENTRY; @@ -2840,11 +2839,29 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, goto no_match; /* Next, search for already existing extent locks that will cover us */ - rc = ldlm_lock_match(obd->obd_namespace, - oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id, - einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode, - oinfo->oi_lockh); - if (rc == 1) { + /* If we're trying to read, we also search for an existing PW lock. The + * VFS and page cache already protect us locally, so lots of readers/ + * writers can share a single PW lock. + * + * There are problems with conversion deadlocks, so instead of + * converting a read lock to a write lock, we'll just enqueue a new + * one. + * + * At some point we should cancel the read lock instead of making them + * send us a blocking callback, but there are problems with canceling + * locks out from other users right now, too. */ + mode = einfo->ei_mode; + if (einfo->ei_mode == LCK_PR) + mode |= LCK_PW; + mode = ldlm_lock_match(obd->obd_namespace, + oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id, + einfo->ei_type, &oinfo->oi_policy, mode, + oinfo->oi_lockh); + if (mode) { + /* addref the lock only if not async requests and PW lock is + * matched whereas we asked for PR. */ + if (!rqset && einfo->ei_mode != mode) + ldlm_lock_addref(oinfo->oi_lockh, LCK_PR); osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata, oinfo->oi_flags); if (intent) { @@ -2857,45 +2874,14 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, oinfo->oi_cb_up(oinfo, ELDLM_OK); /* For async requests, decref the lock. */ - if (rqset) + if (einfo->ei_mode != mode) + ldlm_lock_decref(oinfo->oi_lockh, LCK_PW); + else if (rqset) ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode); RETURN(ELDLM_OK); } - /* If we're trying to read, we also search for an existing PW lock. The - * VFS and page cache already protect us locally, so lots of readers/ - * writers can share a single PW lock. - * - * There are problems with conversion deadlocks, so instead of - * converting a read lock to a write lock, we'll just enqueue a new - * one. - * - * At some point we should cancel the read lock instead of making them - * send us a blocking callback, but there are problems with canceling - * locks out from other users right now, too. */ - - if (einfo->ei_mode == LCK_PR) { - rc = ldlm_lock_match(obd->obd_namespace, - oinfo->oi_flags | LDLM_FL_LVB_READY, - &res_id, einfo->ei_type, &oinfo->oi_policy, - LCK_PW, oinfo->oi_lockh); - if (rc == 1) { - /* FIXME: This is not incredibly elegant, but it might - * be more elegant than adding another parameter to - * lock_match. I want a second opinion. */ - /* addref the lock only if not async requests. */ - if (!rqset) - ldlm_lock_addref(oinfo->oi_lockh, LCK_PR); - osc_set_data_with_check(oinfo->oi_lockh, - einfo->ei_cbdata, - oinfo->oi_flags); - oinfo->oi_cb_up(oinfo, ELDLM_OK); - ldlm_lock_decref(oinfo->oi_lockh, LCK_PW); - RETURN(ELDLM_OK); - } - } - no_match: if (intent) { int size[3] = { @@ -2952,8 +2938,8 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, { struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} }; struct obd_device *obd = exp->exp_obd; - int rc; int lflags = *flags; + ldlm_mode_t rc; ENTRY; OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO); @@ -2964,29 +2950,23 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, policy->l_extent.end |= ~CFS_PAGE_MASK; /* Next, search for already existing extent locks that will cover us */ - rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type, - policy, mode, lockh); - if (rc) { - //if (!(*flags & LDLM_FL_TEST_LOCK)) - osc_set_data_with_check(lockh, data, lflags); - RETURN(rc); - } /* If we're trying to read, we also search for an existing PW lock. The * VFS and page cache already protect us locally, so lots of readers/ * writers can share a single PW lock. */ - if (mode == LCK_PR) { - rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, - &res_id, type, - policy, LCK_PW, lockh); - if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) { - /* FIXME: This is not incredibly elegant, but it might - * be more elegant than adding another parameter to - * lock_match. I want a second opinion. */ - osc_set_data_with_check(lockh, data, lflags); + rc = mode; + if (mode == LCK_PR) + rc |= LCK_PW; + rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, + &res_id, type, policy, rc, lockh); + if (rc) { + osc_set_data_with_check(lockh, data, lflags); + if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) { ldlm_lock_addref(lockh, LCK_PR); ldlm_lock_decref(lockh, LCK_PW); } + RETURN(rc); } + RETURN(rc); }